mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-22 20:10:38 +00:00
Compare commits
6 Commits
vlad/fix-i
...
fix-replay
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb8782a51a | ||
|
|
1b935b1958 | ||
|
|
3f16ca2c18 | ||
|
|
67b94c5992 | ||
|
|
e38193c530 | ||
|
|
21949137ed |
16
Cargo.lock
generated
16
Cargo.lock
generated
@@ -753,6 +753,7 @@ dependencies = [
|
||||
"axum",
|
||||
"axum-core",
|
||||
"bytes",
|
||||
"form_urlencoded",
|
||||
"futures-util",
|
||||
"headers",
|
||||
"http 1.1.0",
|
||||
@@ -761,6 +762,8 @@ dependencies = [
|
||||
"mime",
|
||||
"pin-project-lite",
|
||||
"serde",
|
||||
"serde_html_form",
|
||||
"serde_path_to_error",
|
||||
"tower 0.5.2",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
@@ -6422,6 +6425,19 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_html_form"
|
||||
version = "0.2.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4"
|
||||
dependencies = [
|
||||
"form_urlencoded",
|
||||
"indexmap 2.9.0",
|
||||
"itoa",
|
||||
"ryu",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.125"
|
||||
|
||||
@@ -71,7 +71,7 @@ aws-credential-types = "1.2.0"
|
||||
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
|
||||
aws-types = "1.3"
|
||||
axum = { version = "0.8.1", features = ["ws"] }
|
||||
axum-extra = { version = "0.10.0", features = ["typed-header"] }
|
||||
axum-extra = { version = "0.10.0", features = ["typed-header", "query"] }
|
||||
base64 = "0.13.0"
|
||||
bincode = "1.3"
|
||||
bindgen = "0.71"
|
||||
|
||||
@@ -785,7 +785,7 @@ impl ComputeNode {
|
||||
self.spawn_extension_stats_task();
|
||||
|
||||
if pspec.spec.autoprewarm {
|
||||
self.prewarm_lfc();
|
||||
self.prewarm_lfc(None);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -25,11 +25,16 @@ struct EndpointStoragePair {
|
||||
}
|
||||
|
||||
const KEY: &str = "lfc_state";
|
||||
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
|
||||
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
|
||||
bail!("pspec.endpoint_id missing")
|
||||
impl EndpointStoragePair {
|
||||
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
|
||||
/// If not None, takes precedence over pspec.spec.endpoint_id
|
||||
fn from_spec_and_endpoint(
|
||||
pspec: &crate::compute::ParsedSpec,
|
||||
endpoint_id: Option<String>,
|
||||
) -> Result<Self> {
|
||||
let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
|
||||
let Some(ref endpoint_id) = endpoint_id else {
|
||||
bail!("pspec.endpoint_id missing, other endpoint_id not provided")
|
||||
};
|
||||
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
|
||||
bail!("pspec.endpoint_storage_addr missing")
|
||||
@@ -84,7 +89,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
/// Returns false if there is a prewarm request ongoing, true otherwise
|
||||
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
|
||||
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
|
||||
crate::metrics::LFC_PREWARM_REQUESTS.inc();
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
|
||||
@@ -97,7 +102,7 @@ impl ComputeNode {
|
||||
|
||||
let cloned = self.clone();
|
||||
spawn(async move {
|
||||
let Err(err) = cloned.prewarm_impl().await else {
|
||||
let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
|
||||
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
|
||||
return;
|
||||
};
|
||||
@@ -109,13 +114,14 @@ impl ComputeNode {
|
||||
true
|
||||
}
|
||||
|
||||
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
|
||||
/// from_endpoint: None for endpoint managed by this compute_ctl
|
||||
fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.pspec.as_ref().unwrap().try_into()
|
||||
EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
|
||||
}
|
||||
|
||||
async fn prewarm_impl(&self) -> Result<()> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
|
||||
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
|
||||
info!(%url, "requesting LFC state from endpoint storage");
|
||||
|
||||
let request = Client::new().get(&url).bearer_auth(token);
|
||||
@@ -173,7 +179,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
async fn offload_lfc_impl(&self) -> Result<()> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
|
||||
info!(%url, "requesting LFC state from postgres");
|
||||
|
||||
let mut compressed = Vec::new();
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::compute_prewarm::LfcPrewarmStateWithProgress;
|
||||
use crate::http::JsonResponse;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{Json, http::StatusCode};
|
||||
use axum_extra::extract::OptionalQuery;
|
||||
use compute_api::responses::LfcOffloadState;
|
||||
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
|
||||
|
||||
@@ -16,8 +17,16 @@ pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadS
|
||||
Json(compute.lfc_offload_state())
|
||||
}
|
||||
|
||||
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
|
||||
if compute.prewarm_lfc() {
|
||||
#[derive(serde::Deserialize)]
|
||||
pub struct PrewarmQuery {
|
||||
pub from_endpoint: String,
|
||||
}
|
||||
|
||||
pub(in crate::http) async fn prewarm(
|
||||
compute: Compute,
|
||||
OptionalQuery(query): OptionalQuery<PrewarmQuery>,
|
||||
) -> Response {
|
||||
if compute.prewarm_lfc(query.map(|q| q.from_endpoint)) {
|
||||
StatusCode::ACCEPTED.into_response()
|
||||
} else {
|
||||
JsonResponse::error(
|
||||
|
||||
@@ -1858,29 +1858,6 @@ impl TenantShard {
|
||||
}
|
||||
}
|
||||
|
||||
// At this point we've initialized all timelines and are tracking them.
|
||||
// Now compute the layer visibility for all (not offloaded) timelines.
|
||||
let compute_visiblity_for = {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let mut timelines_offloaded_accessor = self.timelines_offloaded.lock().unwrap();
|
||||
|
||||
timelines_offloaded_accessor.extend(offloaded_timelines_list.into_iter());
|
||||
|
||||
// Before activation, populate each Timeline's GcInfo with information about its children
|
||||
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
|
||||
|
||||
timelines_accessor.values().cloned().collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
for tl in compute_visiblity_for {
|
||||
tl.update_layer_visibility().await.with_context(|| {
|
||||
format!(
|
||||
"failed initial timeline visibility computation {} for tenant {}",
|
||||
tl.timeline_id, self.tenant_shard_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
// Walk through deleted timelines, resume deletion
|
||||
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
|
||||
remote_timeline_client
|
||||
@@ -1900,6 +1877,10 @@ impl TenantShard {
|
||||
.context("resume_deletion")
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
||||
}
|
||||
{
|
||||
let mut offloaded_timelines_accessor = self.timelines_offloaded.lock().unwrap();
|
||||
offloaded_timelines_accessor.extend(offloaded_timelines_list.into_iter());
|
||||
}
|
||||
|
||||
// Stash the preloaded tenant manifest, and upload a new manifest if changed.
|
||||
//
|
||||
@@ -3462,6 +3443,9 @@ impl TenantShard {
|
||||
.values()
|
||||
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
|
||||
|
||||
// Before activation, populate each Timeline's GcInfo with information about its children
|
||||
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
|
||||
|
||||
// Spawn gc and compaction loops. The loops will shut themselves
|
||||
// down when they notice that the tenant is inactive.
|
||||
tasks::start_background_loops(self, background_jobs_can_start);
|
||||
|
||||
@@ -3407,6 +3407,10 @@ impl Timeline {
|
||||
// TenantShard::create_timeline will wait for these uploads to happen before returning, or
|
||||
// on retry.
|
||||
|
||||
// Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan)
|
||||
drop(guard); // drop write lock, update_layer_visibility will take a read lock.
|
||||
self.update_layer_visibility().await?;
|
||||
|
||||
info!(
|
||||
"loaded layer map with {} layers at {}, total physical size: {}",
|
||||
num_layers, disk_consistent_lsn, total_physical_size
|
||||
@@ -5897,7 +5901,7 @@ impl Drop for Timeline {
|
||||
if let Ok(mut gc_info) = ancestor.gc_info.write() {
|
||||
if !gc_info.remove_child_not_offloaded(self.timeline_id) {
|
||||
tracing::error!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id,
|
||||
"Couldn't remove retain_lsn entry from timeline's parent on drop: already removed");
|
||||
"Couldn't remove retain_lsn entry from offloaded timeline's parent: already removed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1092,13 +1092,15 @@ communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
|
||||
MyPState->ring_last <= ring_index);
|
||||
}
|
||||
|
||||
/* internal version. Returns the ring index */
|
||||
/* Internal version. Returns the ring index of the last block (result of this function is used only
|
||||
* when nblocks==1)
|
||||
*/
|
||||
static uint64
|
||||
prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
|
||||
BlockNumber nblocks, const bits8 *mask,
|
||||
bool is_prefetch)
|
||||
{
|
||||
uint64 min_ring_index;
|
||||
uint64 last_ring_index;
|
||||
PrefetchRequest hashkey;
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
bool any_hits = false;
|
||||
@@ -1122,13 +1124,12 @@ Retry:
|
||||
MyPState->ring_unused - MyPState->ring_receive;
|
||||
MyNeonCounters->getpage_prefetches_buffered =
|
||||
MyPState->n_responses_buffered;
|
||||
last_ring_index = UINT64_MAX;
|
||||
|
||||
min_ring_index = UINT64_MAX;
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
PrefetchRequest *slot = NULL;
|
||||
PrfHashEntry *entry = NULL;
|
||||
uint64 ring_index;
|
||||
neon_request_lsns *lsns;
|
||||
|
||||
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
|
||||
@@ -1152,12 +1153,12 @@ Retry:
|
||||
if (entry != NULL)
|
||||
{
|
||||
slot = entry->slot;
|
||||
ring_index = slot->my_ring_index;
|
||||
Assert(slot == GetPrfSlot(ring_index));
|
||||
last_ring_index = slot->my_ring_index;
|
||||
Assert(slot == GetPrfSlot(last_ring_index));
|
||||
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
ring_index < MyPState->ring_unused);
|
||||
Assert(MyPState->ring_last <= last_ring_index &&
|
||||
last_ring_index < MyPState->ring_unused);
|
||||
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
|
||||
|
||||
/*
|
||||
@@ -1169,9 +1170,9 @@ Retry:
|
||||
if (!neon_prefetch_response_usable(lsns, slot))
|
||||
{
|
||||
/* Wait for the old request to finish and discard it */
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
if (!prefetch_wait_for(last_ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
prefetch_set_unused(last_ring_index);
|
||||
entry = NULL;
|
||||
slot = NULL;
|
||||
pgBufferUsage.prefetch.expired += 1;
|
||||
@@ -1188,13 +1189,12 @@ Retry:
|
||||
*/
|
||||
if (slot->status == PRFS_TAG_REMAINS)
|
||||
{
|
||||
prefetch_set_unused(ring_index);
|
||||
prefetch_set_unused(last_ring_index);
|
||||
entry = NULL;
|
||||
slot = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
min_ring_index = Min(min_ring_index, ring_index);
|
||||
/* The buffered request is good enough, return that index */
|
||||
if (is_prefetch)
|
||||
pgBufferUsage.prefetch.duplicates++;
|
||||
@@ -1283,12 +1283,12 @@ Retry:
|
||||
* The next buffer pointed to by `ring_unused` is now definitely empty, so
|
||||
* we can insert the new request to it.
|
||||
*/
|
||||
ring_index = MyPState->ring_unused;
|
||||
last_ring_index = MyPState->ring_unused;
|
||||
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
ring_index <= MyPState->ring_unused);
|
||||
Assert(MyPState->ring_last <= last_ring_index &&
|
||||
last_ring_index <= MyPState->ring_unused);
|
||||
|
||||
slot = GetPrfSlotNoCheck(ring_index);
|
||||
slot = GetPrfSlotNoCheck(last_ring_index);
|
||||
|
||||
Assert(slot->status == PRFS_UNUSED);
|
||||
|
||||
@@ -1298,11 +1298,9 @@ Retry:
|
||||
*/
|
||||
slot->buftag = hashkey.buftag;
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
slot->my_ring_index = last_ring_index;
|
||||
slot->flags = 0;
|
||||
|
||||
min_ring_index = Min(min_ring_index, ring_index);
|
||||
|
||||
if (is_prefetch)
|
||||
MyNeonCounters->getpage_prefetch_requests_total++;
|
||||
else
|
||||
@@ -1315,11 +1313,12 @@ Retry:
|
||||
MyPState->ring_unused - MyPState->ring_receive;
|
||||
|
||||
Assert(any_hits);
|
||||
Assert(last_ring_index != UINT64_MAX);
|
||||
|
||||
Assert(GetPrfSlot(min_ring_index)->status == PRFS_REQUESTED ||
|
||||
GetPrfSlot(min_ring_index)->status == PRFS_RECEIVED);
|
||||
Assert(MyPState->ring_last <= min_ring_index &&
|
||||
min_ring_index < MyPState->ring_unused);
|
||||
Assert(GetPrfSlot(last_ring_index)->status == PRFS_REQUESTED ||
|
||||
GetPrfSlot(last_ring_index)->status == PRFS_RECEIVED);
|
||||
Assert(MyPState->ring_last <= last_ring_index &&
|
||||
last_ring_index < MyPState->ring_unused);
|
||||
|
||||
if (flush_every_n_requests > 0 &&
|
||||
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
|
||||
@@ -1335,7 +1334,7 @@ Retry:
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
return min_ring_index;
|
||||
return last_ring_index;
|
||||
}
|
||||
|
||||
static bool
|
||||
|
||||
@@ -11,6 +11,9 @@
|
||||
#include "utils/guc.h"
|
||||
#include "utils/hsearch.h"
|
||||
|
||||
#if PG_MAJORVERSION_NUM > 14
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
|
||||
|
||||
typedef struct LastWrittenLsnCacheEntry
|
||||
@@ -24,14 +27,20 @@ typedef struct LastWrittenLsnCacheEntry
|
||||
typedef struct LwLsnCacheCtl {
|
||||
int lastWrittenLsnCacheSize;
|
||||
/*
|
||||
* Maximal last written LSN for pages not present in lastWrittenLsnCache
|
||||
*/
|
||||
XLogRecPtr maxLastWrittenLsn;
|
||||
* Highest (most recent) last written LSN, for pages not present in
|
||||
* lastWrittenLsnCache
|
||||
*/
|
||||
XLogRecPtr maxLastWrittenLsnData;
|
||||
|
||||
/*
|
||||
* Double linked list to implement LRU replacement policy for last written LSN cache.
|
||||
* Access to this list as well as to last written LSN cache is protected by 'LastWrittenLsnLock'.
|
||||
*/
|
||||
* Maximal last written LSN for metadata, not present in lastWrittenLsnCache
|
||||
*/
|
||||
XLogRecPtr maxLastWrittenLsnMetadata;
|
||||
|
||||
/*
|
||||
* Double linked list to implement LRU replacement policy for last written LSN cache.
|
||||
* Access to this list as well as to last written LSN cache is protected by 'LastWrittenLsnLock'.
|
||||
*/
|
||||
dlist_head lastWrittenLsnLRU;
|
||||
} LwLsnCacheCtl;
|
||||
|
||||
@@ -108,19 +117,20 @@ init_lwlsncache(void)
|
||||
#else
|
||||
shmemrequest();
|
||||
#endif
|
||||
|
||||
prev_set_lwlsn_block_range_hook = set_lwlsn_block_range_hook;
|
||||
set_lwlsn_block_range_hook = neon_set_lwlsn_block_range;
|
||||
prev_set_lwlsn_block_v_hook = set_lwlsn_block_v_hook;
|
||||
set_lwlsn_block_v_hook = neon_set_lwlsn_block_v;
|
||||
prev_set_lwlsn_block_hook = set_lwlsn_block_hook;
|
||||
set_lwlsn_block_hook = neon_set_lwlsn_block;
|
||||
prev_set_max_lwlsn_hook = set_max_lwlsn_hook;
|
||||
set_max_lwlsn_hook = neon_set_max_lwlsn;
|
||||
prev_set_lwlsn_relation_hook = set_lwlsn_relation_hook;
|
||||
set_lwlsn_relation_hook = neon_set_lwlsn_relation;
|
||||
prev_set_lwlsn_db_hook = set_lwlsn_db_hook;
|
||||
set_lwlsn_db_hook = neon_set_lwlsn_db;
|
||||
|
||||
#define SET_HOOK(name) do { \
|
||||
prev_##name##_hook = name##_hook; \
|
||||
name##_hook = neon_##name; \
|
||||
} while (false)
|
||||
|
||||
SET_HOOK(set_lwlsn_block_range);
|
||||
SET_HOOK(set_lwlsn_block_v);
|
||||
SET_HOOK(set_lwlsn_block);
|
||||
SET_HOOK(set_max_lwlsn);
|
||||
SET_HOOK(set_lwlsn_relation);
|
||||
SET_HOOK(set_lwlsn_db);
|
||||
|
||||
#undef SET_HOOK
|
||||
}
|
||||
|
||||
|
||||
@@ -139,24 +149,34 @@ static void shmemrequest(void) {
|
||||
|
||||
static void shmeminit(void) {
|
||||
static HASHCTL info;
|
||||
bool found;
|
||||
bool found = true;
|
||||
|
||||
if (lwlsn_cache_size > 0)
|
||||
{
|
||||
info.keysize = sizeof(BufferTag);
|
||||
info.entrysize = sizeof(LastWrittenLsnCacheEntry);
|
||||
|
||||
lastWrittenLsnCache = ShmemInitHash("last_written_lsn_cache",
|
||||
lwlsn_cache_size, lwlsn_cache_size,
|
||||
&info,
|
||||
HASH_ELEM | HASH_BLOBS);
|
||||
LwLsnCache = ShmemInitStruct("neon/LwLsnCacheCtl", sizeof(LwLsnCacheCtl), &found);
|
||||
// Now set the size in the struct
|
||||
LwLsnCache->lastWrittenLsnCacheSize = lwlsn_cache_size;
|
||||
if (found) {
|
||||
return;
|
||||
}
|
||||
lwlsn_cache_size, lwlsn_cache_size,
|
||||
&info,
|
||||
HASH_ELEM | HASH_BLOBS);
|
||||
LwLsnCache = ShmemInitStruct("neon/LwLsnCacheCtl",
|
||||
sizeof(LwLsnCacheCtl), &found);
|
||||
}
|
||||
dlist_init(&LwLsnCache->lastWrittenLsnLRU);
|
||||
LwLsnCache->maxLastWrittenLsn = GetRedoRecPtr();
|
||||
|
||||
/* initialize the shmem struct if we allocated it */
|
||||
if (!found) {
|
||||
XLogRecPtr redoPtr;
|
||||
LwLsnCache->lastWrittenLsnCacheSize = lwlsn_cache_size;
|
||||
|
||||
dlist_init(&LwLsnCache->lastWrittenLsnLRU);
|
||||
|
||||
redoPtr = GetRedoRecPtr();
|
||||
|
||||
LwLsnCache->maxLastWrittenLsnMetadata = redoPtr;
|
||||
LwLsnCache->maxLastWrittenLsnData = redoPtr;
|
||||
}
|
||||
|
||||
if (prev_shmem_startup_hook) {
|
||||
prev_shmem_startup_hook();
|
||||
}
|
||||
@@ -180,17 +200,18 @@ neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno)
|
||||
|
||||
LWLockAcquire(LastWrittenLsnLock, LW_SHARED);
|
||||
|
||||
/* Maximal last written LSN among all non-cached pages */
|
||||
lsn = LwLsnCache->maxLastWrittenLsn;
|
||||
|
||||
if (NInfoGetRelNumber(rlocator) != InvalidOid)
|
||||
if (NInfoGetRelNumber(rlocator) != InvalidOid) /* data page*/
|
||||
{
|
||||
BufferTag key;
|
||||
Oid spcOid = NInfoGetSpcOid(rlocator);
|
||||
Oid dbOid = NInfoGetDbOid(rlocator);
|
||||
Oid relNumber = NInfoGetRelNumber(rlocator);
|
||||
|
||||
BufTagInit(key, relNumber, forknum, blkno, spcOid, dbOid);
|
||||
|
||||
|
||||
/* Maximal last written LSN among all non-cached data pages */
|
||||
lsn = LwLsnCache->maxLastWrittenLsnData;
|
||||
|
||||
entry = hash_search(lastWrittenLsnCache, &key, HASH_FIND, NULL);
|
||||
if (entry != NULL)
|
||||
lsn = entry->lsn;
|
||||
@@ -212,9 +233,13 @@ neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno)
|
||||
lsn = SetLastWrittenLSNForBlockRangeInternal(lsn, rlocator, forknum, blkno, 1);
|
||||
}
|
||||
}
|
||||
else
|
||||
else /* metadata */
|
||||
{
|
||||
HASH_SEQ_STATUS seq;
|
||||
/* Maximal last written LSN for metadata */
|
||||
lsn = Max(LwLsnCache->maxLastWrittenLsnMetadata,
|
||||
LwLsnCache->maxLastWrittenLsnData);
|
||||
|
||||
/* Find maximum of all cached LSNs */
|
||||
hash_seq_init(&seq, lastWrittenLsnCache);
|
||||
while ((entry = (LastWrittenLsnCacheEntry *) hash_seq_search(&seq)) != NULL)
|
||||
@@ -230,7 +255,8 @@ neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno)
|
||||
|
||||
static void neon_set_max_lwlsn(XLogRecPtr lsn) {
|
||||
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
|
||||
LwLsnCache->maxLastWrittenLsn = lsn;
|
||||
LwLsnCache->maxLastWrittenLsnMetadata = lsn;
|
||||
LwLsnCache->maxLastWrittenLsnData = lsn;
|
||||
LWLockRelease(LastWrittenLsnLock);
|
||||
}
|
||||
|
||||
@@ -291,7 +317,7 @@ neon_get_lwlsn_v(NRelFileInfo relfilenode, ForkNumber forknum,
|
||||
LWLockRelease(LastWrittenLsnLock);
|
||||
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
|
||||
|
||||
lsn = LwLsnCache->maxLastWrittenLsn;
|
||||
lsn = LwLsnCache->maxLastWrittenLsnData;
|
||||
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
@@ -306,7 +332,8 @@ neon_get_lwlsn_v(NRelFileInfo relfilenode, ForkNumber forknum,
|
||||
else
|
||||
{
|
||||
HASH_SEQ_STATUS seq;
|
||||
lsn = LwLsnCache->maxLastWrittenLsn;
|
||||
Assert(nblocks == 1);
|
||||
lsn = LwLsnCache->maxLastWrittenLsnMetadata;
|
||||
/* Find maximum of all cached LSNs */
|
||||
hash_seq_init(&seq, lastWrittenLsnCache);
|
||||
while ((entry = (LastWrittenLsnCacheEntry *) hash_seq_search(&seq)) != NULL)
|
||||
@@ -334,10 +361,10 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
|
||||
{
|
||||
if (NInfoGetRelNumber(rlocator) == InvalidOid)
|
||||
{
|
||||
if (lsn > LwLsnCache->maxLastWrittenLsn)
|
||||
LwLsnCache->maxLastWrittenLsn = lsn;
|
||||
if (lsn > LwLsnCache->maxLastWrittenLsnMetadata)
|
||||
LwLsnCache->maxLastWrittenLsnMetadata = lsn;
|
||||
else
|
||||
lsn = LwLsnCache->maxLastWrittenLsn;
|
||||
lsn = LwLsnCache->maxLastWrittenLsnMetadata;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -369,10 +396,19 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
|
||||
if (hash_get_num_entries(lastWrittenLsnCache) > LwLsnCache->lastWrittenLsnCacheSize)
|
||||
{
|
||||
/* Replace least recently used entry */
|
||||
LastWrittenLsnCacheEntry* victim = dlist_container(LastWrittenLsnCacheEntry, lru_node, dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
|
||||
LastWrittenLsnCacheEntry* victim = NULL;
|
||||
victim = dlist_container(LastWrittenLsnCacheEntry, lru_node, dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
|
||||
|
||||
while (!XLogRecordReplayFinished(victim->lsn))
|
||||
{
|
||||
/* in recovery, we don't allow eviction of entries with the LSN of a record that has yet to be returned */
|
||||
dlist_push_tail(&LwLsnCache->lastWrittenLsnLRU, &entry->lru_node);
|
||||
victim = dlist_container(LastWrittenLsnCacheEntry, lru_node, dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
|
||||
}
|
||||
|
||||
/* Adjust max LSN for not cached relations/chunks if needed */
|
||||
if (victim->lsn > LwLsnCache->maxLastWrittenLsn)
|
||||
LwLsnCache->maxLastWrittenLsn = victim->lsn;
|
||||
if (victim->lsn > LwLsnCache->maxLastWrittenLsnMetadata)
|
||||
LwLsnCache->maxLastWrittenLsnMetadata = victim->lsn;
|
||||
|
||||
hash_search(lastWrittenLsnCache, victim, HASH_REMOVE, NULL);
|
||||
}
|
||||
@@ -433,6 +469,13 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
|
||||
Oid dbOid = NInfoGetDbOid(relfilenode);
|
||||
Oid relNumber = NInfoGetRelNumber(relfilenode);
|
||||
|
||||
/*
|
||||
* We ignore the operation when the input is invalid:
|
||||
* - we must have gotten LSNs to set
|
||||
* - we must have pages to write
|
||||
* - the cache must be enabled
|
||||
* - we must be processing a data page, not a metadata request
|
||||
*/
|
||||
if (lsns == NULL || nblocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0 ||
|
||||
NInfoGetRelNumber(relfilenode) == InvalidOid)
|
||||
return InvalidXLogRecPtr;
|
||||
@@ -466,10 +509,25 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
|
||||
if (hash_get_num_entries(lastWrittenLsnCache) > LwLsnCache->lastWrittenLsnCacheSize)
|
||||
{
|
||||
/* Replace least recently used entry */
|
||||
LastWrittenLsnCacheEntry* victim = dlist_container(LastWrittenLsnCacheEntry, lru_node, dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
|
||||
LastWrittenLsnCacheEntry* victim = dlist_container(LastWrittenLsnCacheEntry, lru_node,
|
||||
dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
|
||||
|
||||
/*
|
||||
* If replay is still working on this LSN, we can't evict the
|
||||
* page. Therefore, we must find a different victim, and return
|
||||
* the one we just found to the pool.
|
||||
*/
|
||||
while (!XLogRecordReplayFinished(victim->lsn))
|
||||
{
|
||||
dlist_push_tail(&LwLsnCache->lastWrittenLsnLRU,
|
||||
&entry->lru_node);
|
||||
victim = dlist_container(LastWrittenLsnCacheEntry, lru_node,
|
||||
dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
|
||||
}
|
||||
|
||||
/* Adjust max LSN for not cached relations/chunks if needed */
|
||||
if (victim->lsn > LwLsnCache->maxLastWrittenLsn)
|
||||
LwLsnCache->maxLastWrittenLsn = victim->lsn;
|
||||
if (victim->lsn > LwLsnCache->maxLastWrittenLsnData)
|
||||
LwLsnCache->maxLastWrittenLsnData = victim->lsn;
|
||||
|
||||
hash_search(lastWrittenLsnCache, victim, HASH_REMOVE, NULL);
|
||||
}
|
||||
|
||||
@@ -14,9 +14,9 @@ use crate::context::RequestContext;
|
||||
use crate::control_plane::client::cplane_proxy_v1;
|
||||
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::pglb::connect_compute::WakeComputeBackend;
|
||||
use crate::pqproto::BeMessage;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::proxy::wake_compute::WakeComputeBackend;
|
||||
use crate::stream::PqStream;
|
||||
use crate::types::RoleName;
|
||||
use crate::{auth, compute, waiters};
|
||||
|
||||
@@ -14,20 +14,21 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::auth::{self, AuthError, ComputeUserInfoMaybeEndpoint, validate_password_and_exchange};
|
||||
use crate::auth::{self, ComputeUserInfoMaybeEndpoint, validate_password_and_exchange};
|
||||
use crate::cache::Cached;
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::client::ControlPlaneClient;
|
||||
use crate::control_plane::errors::GetAuthInfoError;
|
||||
use crate::control_plane::messages::EndpointRateLimitConfig;
|
||||
use crate::control_plane::{
|
||||
self, AccessBlockerFlags, AuthSecret, CachedNodeInfo, ControlPlaneApi, EndpointAccessControl,
|
||||
RoleAccessControl,
|
||||
};
|
||||
use crate::intern::EndpointIdInt;
|
||||
use crate::pglb::connect_compute::WakeComputeBackend;
|
||||
use crate::pqproto::BeMessage;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::proxy::wake_compute::WakeComputeBackend;
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::stream::Stream;
|
||||
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
|
||||
@@ -230,11 +231,8 @@ async fn auth_quirks(
|
||||
config.is_vpc_acccess_proxy,
|
||||
)?;
|
||||
|
||||
let endpoint = EndpointIdInt::from(&info.endpoint);
|
||||
let rate_limit_config = None;
|
||||
if !endpoint_rate_limiter.check(endpoint, rate_limit_config, 1) {
|
||||
return Err(AuthError::too_many_connections());
|
||||
}
|
||||
access_controls.connection_attempt_rate_limit(ctx, &info.endpoint, &endpoint_rate_limiter)?;
|
||||
|
||||
let role_access = api
|
||||
.get_role_access_control(ctx, &info.endpoint, &info.user)
|
||||
.await?;
|
||||
@@ -401,6 +399,7 @@ impl Backend<'_, ComputeUserInfo> {
|
||||
allowed_ips: Arc::new(vec![]),
|
||||
allowed_vpce: Arc::new(vec![]),
|
||||
flags: AccessBlockerFlags::default(),
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -439,6 +438,7 @@ mod tests {
|
||||
use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern};
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::EndpointRateLimitConfig;
|
||||
use crate::control_plane::{
|
||||
self, AccessBlockerFlags, CachedNodeInfo, EndpointAccessControl, RoleAccessControl,
|
||||
};
|
||||
@@ -477,6 +477,7 @@ mod tests {
|
||||
allowed_ips: Arc::new(self.ips.clone()),
|
||||
allowed_vpce: Arc::new(self.vpc_endpoint_ids.clone()),
|
||||
flags: self.access_blocker_flags,
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -28,10 +28,9 @@ use crate::context::RequestContext;
|
||||
use crate::metrics::{Metrics, ThreadPoolMetrics};
|
||||
use crate::pqproto::FeStartupPacket;
|
||||
use crate::protocol2::ConnectionInfo;
|
||||
use crate::proxy::{
|
||||
ErrorSource, TlsRequired, copy_bidirectional_client_compute, run_until_cancelled,
|
||||
};
|
||||
use crate::proxy::{ErrorSource, TlsRequired, copy_bidirectional_client_compute};
|
||||
use crate::stream::{PqStream, Stream};
|
||||
use crate::util::run_until_cancelled;
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
|
||||
4
proxy/src/cache/project_info.rs
vendored
4
proxy/src/cache/project_info.rs
vendored
@@ -364,6 +364,7 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::control_plane::messages::EndpointRateLimitConfig;
|
||||
use crate::control_plane::{AccessBlockerFlags, AuthSecret};
|
||||
use crate::scram::ServerSecret;
|
||||
use crate::types::ProjectId;
|
||||
@@ -399,6 +400,7 @@ mod tests {
|
||||
allowed_ips: allowed_ips.clone(),
|
||||
allowed_vpce: Arc::new(vec![]),
|
||||
flags: AccessBlockerFlags::default(),
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
},
|
||||
RoleAccessControl {
|
||||
secret: secret1.clone(),
|
||||
@@ -414,6 +416,7 @@ mod tests {
|
||||
allowed_ips: allowed_ips.clone(),
|
||||
allowed_vpce: Arc::new(vec![]),
|
||||
flags: AccessBlockerFlags::default(),
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
},
|
||||
RoleAccessControl {
|
||||
secret: secret2.clone(),
|
||||
@@ -439,6 +442,7 @@ mod tests {
|
||||
allowed_ips: allowed_ips.clone(),
|
||||
allowed_vpce: Arc::new(vec![]),
|
||||
flags: AccessBlockerFlags::default(),
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
},
|
||||
RoleAccessControl {
|
||||
secret: secret3.clone(),
|
||||
|
||||
@@ -11,13 +11,12 @@ use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::pglb::connect_compute::{TcpMechanism, connect_to_compute};
|
||||
use crate::pglb::handshake::{HandshakeData, handshake};
|
||||
use crate::pglb::passthrough::ProxyPassthrough;
|
||||
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
|
||||
use crate::proxy::{
|
||||
ClientRequestError, ErrorSource, prepare_client_connection, run_until_cancelled,
|
||||
};
|
||||
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
|
||||
use crate::proxy::{ClientRequestError, ErrorSource, prepare_client_connection};
|
||||
use crate::util::run_until_cancelled;
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
|
||||
@@ -146,6 +146,7 @@ impl NeonControlPlaneClient {
|
||||
public_access_blocked: block_public_connections,
|
||||
vpc_access_blocked: block_vpc_connections,
|
||||
},
|
||||
rate_limits: body.rate_limits,
|
||||
})
|
||||
}
|
||||
.inspect_err(|e| tracing::debug!(error = ?e))
|
||||
@@ -312,6 +313,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
allowed_ips: Arc::new(auth_info.allowed_ips),
|
||||
allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
|
||||
flags: auth_info.access_blocker_flags,
|
||||
rate_limits: auth_info.rate_limits,
|
||||
};
|
||||
let role_control = RoleAccessControl {
|
||||
secret: auth_info.secret,
|
||||
@@ -357,6 +359,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
allowed_ips: Arc::new(auth_info.allowed_ips),
|
||||
allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
|
||||
flags: auth_info.access_blocker_flags,
|
||||
rate_limits: auth_info.rate_limits,
|
||||
};
|
||||
let role_control = RoleAccessControl {
|
||||
secret: auth_info.secret,
|
||||
|
||||
@@ -20,7 +20,7 @@ use crate::context::RequestContext;
|
||||
use crate::control_plane::errors::{
|
||||
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
|
||||
};
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::control_plane::messages::{EndpointRateLimitConfig, MetricsAuxInfo};
|
||||
use crate::control_plane::{
|
||||
AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo,
|
||||
RoleAccessControl,
|
||||
@@ -130,6 +130,7 @@ impl MockControlPlane {
|
||||
project_id: None,
|
||||
account_id: None,
|
||||
access_blocker_flags: AccessBlockerFlags::default(),
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -233,6 +234,7 @@ impl super::ControlPlaneApi for MockControlPlane {
|
||||
allowed_ips: Arc::new(info.allowed_ips),
|
||||
allowed_vpce: Arc::new(info.allowed_vpc_endpoint_ids),
|
||||
flags: info.access_blocker_flags,
|
||||
rate_limits: info.rate_limits,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ use clashmap::ClashMap;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use super::{EndpointAccessControl, RoleAccessControl};
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError};
|
||||
use crate::cache::endpoints::EndpointsCache;
|
||||
@@ -22,8 +23,6 @@ use crate::metrics::ApiLockMetrics;
|
||||
use crate::rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token};
|
||||
use crate::types::EndpointId;
|
||||
|
||||
use super::{EndpointAccessControl, RoleAccessControl};
|
||||
|
||||
#[non_exhaustive]
|
||||
#[derive(Clone)]
|
||||
pub enum ControlPlaneClient {
|
||||
|
||||
@@ -227,12 +227,35 @@ pub(crate) struct UserFacingMessage {
|
||||
#[derive(Deserialize)]
|
||||
pub(crate) struct GetEndpointAccessControl {
|
||||
pub(crate) role_secret: Box<str>,
|
||||
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
|
||||
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
|
||||
|
||||
pub(crate) project_id: Option<ProjectIdInt>,
|
||||
pub(crate) account_id: Option<AccountIdInt>,
|
||||
|
||||
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
|
||||
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
|
||||
pub(crate) block_public_connections: Option<bool>,
|
||||
pub(crate) block_vpc_connections: Option<bool>,
|
||||
|
||||
#[serde(default)]
|
||||
pub(crate) rate_limits: EndpointRateLimitConfig,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize, Default)]
|
||||
pub struct EndpointRateLimitConfig {
|
||||
pub connection_attempts: ConnectionAttemptsLimit,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize, Default)]
|
||||
pub struct ConnectionAttemptsLimit {
|
||||
pub tcp: Option<LeakyBucketSetting>,
|
||||
pub ws: Option<LeakyBucketSetting>,
|
||||
pub http: Option<LeakyBucketSetting>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize)]
|
||||
pub struct LeakyBucketSetting {
|
||||
pub rps: f64,
|
||||
pub burst: f64,
|
||||
}
|
||||
|
||||
/// Response which holds compute node's `host:port` pair.
|
||||
|
||||
@@ -11,6 +11,8 @@ pub(crate) mod errors;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use messages::EndpointRateLimitConfig;
|
||||
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::auth::backend::jwt::AuthRule;
|
||||
use crate::auth::{AuthError, IpPattern, check_peer_addr_is_in_list};
|
||||
@@ -18,8 +20,9 @@ use crate::cache::{Cached, TimedLru};
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
|
||||
use crate::intern::{AccountIdInt, ProjectIdInt};
|
||||
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt};
|
||||
use crate::protocol2::ConnectionInfoExtra;
|
||||
use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig};
|
||||
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
|
||||
use crate::{compute, scram};
|
||||
|
||||
@@ -56,6 +59,8 @@ pub(crate) struct AuthInfo {
|
||||
pub(crate) account_id: Option<AccountIdInt>,
|
||||
/// Are public connections or VPC connections blocked?
|
||||
pub(crate) access_blocker_flags: AccessBlockerFlags,
|
||||
/// The rate limits for this endpoint.
|
||||
pub(crate) rate_limits: EndpointRateLimitConfig,
|
||||
}
|
||||
|
||||
/// Info for establishing a connection to a compute node.
|
||||
@@ -101,6 +106,8 @@ pub struct EndpointAccessControl {
|
||||
pub allowed_ips: Arc<Vec<IpPattern>>,
|
||||
pub allowed_vpce: Arc<Vec<String>>,
|
||||
pub flags: AccessBlockerFlags,
|
||||
|
||||
pub rate_limits: EndpointRateLimitConfig,
|
||||
}
|
||||
|
||||
impl EndpointAccessControl {
|
||||
@@ -139,6 +146,36 @@ impl EndpointAccessControl {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn connection_attempt_rate_limit(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
endpoint: &EndpointId,
|
||||
rate_limiter: &EndpointRateLimiter,
|
||||
) -> Result<(), AuthError> {
|
||||
let endpoint = EndpointIdInt::from(endpoint);
|
||||
|
||||
let limits = &self.rate_limits.connection_attempts;
|
||||
let config = match ctx.protocol() {
|
||||
crate::metrics::Protocol::Http => limits.http,
|
||||
crate::metrics::Protocol::Ws => limits.ws,
|
||||
crate::metrics::Protocol::Tcp => limits.tcp,
|
||||
crate::metrics::Protocol::SniRouter => return Ok(()),
|
||||
};
|
||||
let config = config.and_then(|config| {
|
||||
if config.rps <= 0.0 || config.burst <= 0.0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(LeakyBucketConfig::new(config.rps, config.burst))
|
||||
});
|
||||
|
||||
if !rate_limiter.check(endpoint, config, 1) {
|
||||
return Err(AuthError::too_many_connections());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// This will allocate per each call, but the http requests alone
|
||||
|
||||
@@ -106,4 +106,5 @@ mod tls;
|
||||
mod types;
|
||||
mod url;
|
||||
mod usage_metrics;
|
||||
mod util;
|
||||
mod waiters;
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
pub mod connect_compute;
|
||||
pub mod copy_bidirectional;
|
||||
pub mod handshake;
|
||||
pub mod inprocess;
|
||||
|
||||
@@ -8,19 +8,19 @@ use crate::config::{ComputeConfig, RetryConfig};
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::errors::WakeComputeError;
|
||||
use crate::control_plane::locks::ApiLocks;
|
||||
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::control_plane::{self, NodeInfo};
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{
|
||||
ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType,
|
||||
};
|
||||
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute, retry_after, should_retry};
|
||||
use crate::proxy::wake_compute::wake_compute;
|
||||
use crate::proxy::wake_compute::{WakeComputeBackend, wake_compute};
|
||||
use crate::types::Host;
|
||||
|
||||
/// If we couldn't connect, a cached connection info might be to blame
|
||||
/// (e.g. the compute node's address might've changed at the wrong time).
|
||||
/// Invalidate the cache entry (if any) to prevent subsequent errors.
|
||||
#[tracing::instrument(name = "invalidate_cache", skip_all)]
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) fn invalidate_cache(node_info: control_plane::CachedNodeInfo) -> NodeInfo {
|
||||
let is_cached = node_info.cached();
|
||||
if is_cached {
|
||||
@@ -49,14 +49,6 @@ pub(crate) trait ConnectMechanism {
|
||||
) -> Result<Self::Connection, Self::ConnectError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub(crate) trait WakeComputeBackend {
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError>;
|
||||
}
|
||||
|
||||
pub(crate) struct TcpMechanism {
|
||||
pub(crate) auth: AuthInfo,
|
||||
/// connect_to_compute concurrency lock
|
||||
@@ -1,8 +1,10 @@
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub(crate) mod connect_compute;
|
||||
pub(crate) mod retry;
|
||||
pub(crate) mod wake_compute;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::FutureExt;
|
||||
@@ -21,15 +23,16 @@ use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::pglb::connect_compute::{TcpMechanism, connect_to_compute};
|
||||
pub use crate::pglb::copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute};
|
||||
use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake};
|
||||
use crate::pglb::passthrough::ProxyPassthrough;
|
||||
use crate::pqproto::{BeMessage, CancelKeyData, StartupMessageParams};
|
||||
use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol};
|
||||
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::stream::{PqStream, Stream};
|
||||
use crate::types::EndpointCacheKey;
|
||||
use crate::util::run_until_cancelled;
|
||||
use crate::{auth, compute};
|
||||
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
@@ -46,21 +49,6 @@ impl ReportableError for TlsRequired {
|
||||
|
||||
impl UserFacingError for TlsRequired {}
|
||||
|
||||
pub async fn run_until_cancelled<F: std::future::Future>(
|
||||
f: F,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> Option<F::Output> {
|
||||
match futures::future::select(
|
||||
std::pin::pin!(f),
|
||||
std::pin::pin!(cancellation_token.cancelled()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
futures::future::Either::Left((f, _)) => Some(f),
|
||||
futures::future::Either::Right(((), _)) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
auth_backend: &'static auth::Backend<'static, ()>,
|
||||
|
||||
@@ -25,7 +25,7 @@ use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient};
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status};
|
||||
use crate::control_plane::{self, CachedNodeInfo, NodeInfo, NodeInfoCache};
|
||||
use crate::error::ErrorKind;
|
||||
use crate::pglb::connect_compute::ConnectMechanism;
|
||||
use crate::proxy::connect_compute::ConnectMechanism;
|
||||
use crate::tls::client_config::compute_client_config_with_certs;
|
||||
use crate::tls::server_config::CertResolver;
|
||||
use crate::types::{BranchId, EndpointId, ProjectId};
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use async_trait::async_trait;
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::config::RetryConfig;
|
||||
@@ -8,7 +9,6 @@ use crate::error::ReportableError;
|
||||
use crate::metrics::{
|
||||
ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
|
||||
};
|
||||
use crate::pglb::connect_compute::WakeComputeBackend;
|
||||
use crate::proxy::retry::{retry_after, should_retry};
|
||||
|
||||
// Use macro to retain original callsite.
|
||||
@@ -23,6 +23,11 @@ macro_rules! log_wake_compute_error {
|
||||
};
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub(crate) trait WakeComputeBackend {
|
||||
async fn wake_compute(&self, ctx: &RequestContext) -> Result<CachedNodeInfo, WakeComputeError>;
|
||||
}
|
||||
|
||||
pub(crate) async fn wake_compute<B: WakeComputeBackend>(
|
||||
num_retries: &mut u32,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -69,9 +69,8 @@ pub struct LeakyBucketConfig {
|
||||
pub max: f64,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl LeakyBucketConfig {
|
||||
pub(crate) fn new(rps: f64, max: f64) -> Self {
|
||||
pub fn new(rps: f64, max: f64) -> Self {
|
||||
assert!(rps > 0.0, "rps must be positive");
|
||||
assert!(max > 0.0, "max must be positive");
|
||||
Self { rps, max }
|
||||
|
||||
@@ -12,11 +12,10 @@ use rand::{Rng, SeedableRng};
|
||||
use tokio::time::{Duration, Instant};
|
||||
use tracing::info;
|
||||
|
||||
use super::LeakyBucketConfig;
|
||||
use crate::ext::LockExt;
|
||||
use crate::intern::EndpointIdInt;
|
||||
|
||||
use super::LeakyBucketConfig;
|
||||
|
||||
pub struct GlobalRateLimiter {
|
||||
data: Vec<RateBucket>,
|
||||
info: Vec<RateBucketInfo>,
|
||||
|
||||
@@ -34,7 +34,7 @@ use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
|
||||
use crate::control_plane::locks::ApiLocks;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::intern::EndpointIdInt;
|
||||
use crate::pglb::connect_compute::ConnectMechanism;
|
||||
use crate::proxy::connect_compute::ConnectMechanism;
|
||||
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
|
||||
@@ -68,17 +68,20 @@ impl PoolingBackend {
|
||||
self.config.authentication_config.is_vpc_acccess_proxy,
|
||||
)?;
|
||||
|
||||
let ep = EndpointIdInt::from(&user_info.endpoint);
|
||||
let rate_limit_config = None;
|
||||
if !self.endpoint_rate_limiter.check(ep, rate_limit_config, 1) {
|
||||
return Err(AuthError::too_many_connections());
|
||||
}
|
||||
access_control.connection_attempt_rate_limit(
|
||||
ctx,
|
||||
&user_info.endpoint,
|
||||
&self.endpoint_rate_limiter,
|
||||
)?;
|
||||
|
||||
let role_access = backend.get_role_secret(ctx).await?;
|
||||
let Some(secret) = role_access.secret else {
|
||||
// If we don't have an authentication secret, for the http flow we can just return an error.
|
||||
info!("authentication info not found");
|
||||
return Err(AuthError::password_failed(&*user_info.user));
|
||||
};
|
||||
|
||||
let ep = EndpointIdInt::from(&user_info.endpoint);
|
||||
let auth_outcome = crate::auth::validate_password_and_exchange(
|
||||
&self.config.authentication_config.thread_pool,
|
||||
ep,
|
||||
@@ -181,7 +184,7 @@ impl PoolingBackend {
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let backend = self.auth_backend.as_ref().map(|()| keys.info);
|
||||
crate::pglb::connect_compute::connect_to_compute(
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
ctx,
|
||||
&TokioMechanism {
|
||||
conn_id,
|
||||
@@ -223,7 +226,7 @@ impl PoolingBackend {
|
||||
)),
|
||||
options: conn_info.user_info.options.clone(),
|
||||
});
|
||||
crate::pglb::connect_compute::connect_to_compute(
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
ctx,
|
||||
&HyperMechanism {
|
||||
conn_id,
|
||||
|
||||
@@ -50,10 +50,10 @@ use crate::context::RequestContext;
|
||||
use crate::ext::TaskExt;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
|
||||
use crate::proxy::run_until_cancelled;
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::serverless::backend::PoolingBackend;
|
||||
use crate::serverless::http_util::{api_error_into_response, json_response};
|
||||
use crate::util::run_until_cancelled;
|
||||
|
||||
pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
|
||||
pub(crate) const AUTH_BROKER_SNI: &str = "apiauth";
|
||||
|
||||
@@ -41,10 +41,11 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::http::{ReadBodyError, read_body_with_limit};
|
||||
use crate::metrics::{HttpDirection, Metrics, SniGroup, SniKind};
|
||||
use crate::pqproto::StartupMessageParams;
|
||||
use crate::proxy::{NeonOptions, run_until_cancelled};
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::serverless::backend::HttpConnError;
|
||||
use crate::types::{DbName, RoleName};
|
||||
use crate::usage_metrics::{MetricCounter, MetricCounterRecorder};
|
||||
use crate::util::run_until_cancelled;
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
|
||||
14
proxy/src/util.rs
Normal file
14
proxy/src/util.rs
Normal file
@@ -0,0 +1,14 @@
|
||||
use std::pin::pin;
|
||||
|
||||
use futures::future::{Either, select};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub async fn run_until_cancelled<F: Future>(
|
||||
f: F,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> Option<F::Output> {
|
||||
match select(pin!(f), pin!(cancellation_token.cancelled())).await {
|
||||
Either::Left((f, _)) => Some(f),
|
||||
Either::Right(((), _)) => None,
|
||||
}
|
||||
}
|
||||
@@ -69,8 +69,10 @@ class EndpointHttpClient(requests.Session):
|
||||
json: dict[str, str] = res.json()
|
||||
return json
|
||||
|
||||
def prewarm_lfc(self):
|
||||
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
|
||||
def prewarm_lfc(self, from_endpoint_id: str | None = None):
|
||||
url: str = f"http://localhost:{self.external_port}/lfc/prewarm"
|
||||
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
|
||||
self.post(url, params=params).raise_for_status()
|
||||
|
||||
def prewarmed():
|
||||
json = self.prewarm_lfc_status()
|
||||
|
||||
@@ -129,6 +129,18 @@ class NeonAPI:
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_project_limits(self, project_id: str) -> dict[str, Any]:
|
||||
resp = self.__request(
|
||||
"GET",
|
||||
f"/projects/{project_id}/limits",
|
||||
headers={
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def delete_project(
|
||||
self,
|
||||
project_id: str,
|
||||
|
||||
@@ -197,7 +197,8 @@ def print_gc_result(row: dict[str, Any]):
|
||||
log.info("GC duration {elapsed} ms".format_map(row))
|
||||
log.info(
|
||||
(
|
||||
" eligible: {layers_eligible}, not_updated: {layers_not_updated}, removed: {layers_removed}"
|
||||
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_pitr {layers_needed_by_pitr}"
|
||||
" needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
|
||||
).format_map(row)
|
||||
)
|
||||
|
||||
|
||||
@@ -45,6 +45,8 @@ class NeonEndpoint:
|
||||
if self.branch.connect_env:
|
||||
self.connect_env = self.branch.connect_env.copy()
|
||||
self.connect_env["PGHOST"] = self.host
|
||||
if self.type == "read_only":
|
||||
self.project.read_only_endpoints_total += 1
|
||||
|
||||
def delete(self):
|
||||
self.project.delete_endpoint(self.id)
|
||||
@@ -228,8 +230,13 @@ class NeonProject:
|
||||
self.benchmarks: dict[str, subprocess.Popen[Any]] = {}
|
||||
self.restore_num: int = 0
|
||||
self.restart_pgbench_on_console_errors: bool = False
|
||||
self.limits: dict[str, Any] = self.get_limits()["limits"]
|
||||
self.read_only_endpoints_total: int = 0
|
||||
|
||||
def delete(self):
|
||||
def get_limits(self) -> dict[str, Any]:
|
||||
return self.neon_api.get_project_limits(self.id)
|
||||
|
||||
def delete(self) -> None:
|
||||
self.neon_api.delete_project(self.id)
|
||||
|
||||
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None:
|
||||
@@ -282,6 +289,7 @@ class NeonProject:
|
||||
self.neon_api.delete_endpoint(self.id, endpoint_id)
|
||||
self.endpoints[endpoint_id].branch.endpoints.pop(endpoint_id)
|
||||
self.endpoints.pop(endpoint_id)
|
||||
self.read_only_endpoints_total -= 1
|
||||
self.wait()
|
||||
|
||||
def start_benchmark(self, target: str, clients: int = 10) -> subprocess.Popen[Any]:
|
||||
@@ -369,49 +377,64 @@ def setup_class(
|
||||
print(f"::warning::Retried on 524 error {neon_api.retries524} times")
|
||||
if neon_api.retries4xx > 0:
|
||||
print(f"::warning::Retried on 4xx error {neon_api.retries4xx} times")
|
||||
log.info("Removing the project")
|
||||
log.info("Removing the project %s", project.id)
|
||||
project.delete()
|
||||
|
||||
|
||||
def do_action(project: NeonProject, action: str) -> None:
|
||||
def do_action(project: NeonProject, action: str) -> bool:
|
||||
"""
|
||||
Runs the action
|
||||
"""
|
||||
log.info("Action: %s", action)
|
||||
if action == "new_branch":
|
||||
log.info("Trying to create a new branch")
|
||||
if 0 <= project.limits["max_branches"] <= len(project.branches):
|
||||
log.info(
|
||||
"Maximum branch limit exceeded (%s of %s)",
|
||||
len(project.branches),
|
||||
project.limits["max_branches"],
|
||||
)
|
||||
return False
|
||||
parent = project.branches[
|
||||
random.choice(list(set(project.branches.keys()) - project.reset_branches))
|
||||
]
|
||||
log.info("Parent: %s", parent)
|
||||
child = parent.create_child_branch()
|
||||
if child is None:
|
||||
return
|
||||
return False
|
||||
log.info("Created branch %s", child)
|
||||
child.start_benchmark()
|
||||
elif action == "delete_branch":
|
||||
if project.leaf_branches:
|
||||
target = random.choice(list(project.leaf_branches.values()))
|
||||
target: NeonBranch = random.choice(list(project.leaf_branches.values()))
|
||||
log.info("Trying to delete branch %s", target)
|
||||
target.delete()
|
||||
else:
|
||||
log.info("Leaf branches not found, skipping")
|
||||
return False
|
||||
elif action == "new_ro_endpoint":
|
||||
if 0 <= project.limits["max_read_only_endpoints"] <= project.read_only_endpoints_total:
|
||||
log.info(
|
||||
"Maximum read only endpoint limit exceeded (%s of %s)",
|
||||
project.read_only_endpoints_total,
|
||||
project.limits["max_read_only_endpoints"],
|
||||
)
|
||||
return False
|
||||
ep = random.choice(
|
||||
[br for br in project.branches.values() if br.id not in project.reset_branches]
|
||||
).create_ro_endpoint()
|
||||
log.info("Created the RO endpoint with id %s branch: %s", ep.id, ep.branch.id)
|
||||
ep.start_benchmark()
|
||||
elif action == "delete_ro_endpoint":
|
||||
if project.read_only_endpoints_total == 0:
|
||||
log.info("no read_only endpoints present, skipping")
|
||||
return False
|
||||
ro_endpoints: list[NeonEndpoint] = [
|
||||
endpoint for endpoint in project.endpoints.values() if endpoint.type == "read_only"
|
||||
]
|
||||
if ro_endpoints:
|
||||
target_ep: NeonEndpoint = random.choice(ro_endpoints)
|
||||
target_ep.delete()
|
||||
log.info("endpoint %s deleted", target_ep.id)
|
||||
else:
|
||||
log.info("no read_only endpoints present, skipping")
|
||||
target_ep: NeonEndpoint = random.choice(ro_endpoints)
|
||||
target_ep.delete()
|
||||
log.info("endpoint %s deleted", target_ep.id)
|
||||
elif action == "restore_random_time":
|
||||
if project.leaf_branches:
|
||||
br: NeonBranch = random.choice(list(project.leaf_branches.values()))
|
||||
@@ -419,8 +442,10 @@ def do_action(project: NeonProject, action: str) -> None:
|
||||
br.restore_random_time()
|
||||
else:
|
||||
log.info("No leaf branches found")
|
||||
return False
|
||||
else:
|
||||
raise ValueError(f"The action {action} is unknown")
|
||||
return True
|
||||
|
||||
|
||||
@pytest.mark.timeout(7200)
|
||||
@@ -457,8 +482,9 @@ def test_api_random(
|
||||
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
|
||||
for _ in range(num_operations):
|
||||
log.info("Starting action #%s", _ + 1)
|
||||
do_action(
|
||||
while not do_action(
|
||||
project, random.choices([a[0] for a in ACTIONS], weights=[w[1] for w in ACTIONS])[0]
|
||||
)
|
||||
):
|
||||
log.info("Retrying...")
|
||||
project.check_all_benchmarks()
|
||||
assert True
|
||||
|
||||
@@ -188,7 +188,8 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet
|
||||
pg_cur.execute("select pg_reload_conf()")
|
||||
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
http_client.prewarm_lfc()
|
||||
# Same thing as prewarm_lfc(), testing other method
|
||||
http_client.prewarm_lfc(endpoint.endpoint_id)
|
||||
else:
|
||||
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
||||
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 6770bc2513...9f8dd2fa12
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 8c3249f36c...d49bea7d54
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 7a4c0eacae...ec96c1ce86
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: db424d42d7...876755144d
Reference in New Issue
Block a user