Merge branch 'main' into ruslan/subzero-integration

This commit is contained in:
Ruslan Talpa
2025-07-04 13:03:55 +03:00
committed by GitHub
64 changed files with 1287 additions and 584 deletions

View File

@@ -2371,24 +2371,23 @@ LIMIT 100",
installed_extensions_collection_interval
);
let handle = tokio::spawn(async move {
// An initial sleep is added to ensure that two collections don't happen at the same time.
// The first collection happens during compute startup.
tokio::time::sleep(tokio::time::Duration::from_secs(
installed_extensions_collection_interval,
))
.await;
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
installed_extensions_collection_interval,
));
loop {
interval.tick().await;
info!(
"[NEON_EXT_INT_SLEEP]: Interval: {}",
installed_extensions_collection_interval
);
// Sleep at the start of the loop to ensure that two collections don't happen at the same time.
// The first collection happens during compute startup.
tokio::time::sleep(tokio::time::Duration::from_secs(
installed_extensions_collection_interval,
))
.await;
let _ = installed_extensions(conf.clone()).await;
// Acquire a read lock on the compute spec and then update the interval if necessary
interval = tokio::time::interval(tokio::time::Duration::from_secs(std::cmp::max(
installed_extensions_collection_interval = std::cmp::max(
installed_extensions_collection_interval,
2 * atomic_interval.load(std::sync::atomic::Ordering::SeqCst),
)));
installed_extensions_collection_interval = interval.period().as_secs();
);
}
});

View File

@@ -420,6 +420,7 @@ impl From<NodeSchedulingPolicy> for String {
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
pub enum SkSchedulingPolicy {
Active,
Activating,
Pause,
Decomissioned,
}
@@ -430,6 +431,7 @@ impl FromStr for SkSchedulingPolicy {
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"active" => Self::Active,
"activating" => Self::Activating,
"pause" => Self::Pause,
"decomissioned" => Self::Decomissioned,
_ => {
@@ -446,6 +448,7 @@ impl From<SkSchedulingPolicy> for String {
use SkSchedulingPolicy::*;
match value {
Active => "active",
Activating => "activating",
Pause => "pause",
Decomissioned => "decomissioned",
}

View File

@@ -78,7 +78,13 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
use io::ErrorKind::*;
matches!(
e.kind(),
BrokenPipe | ConnectionRefused | ConnectionAborted | ConnectionReset | TimedOut
HostUnreachable
| NetworkUnreachable
| BrokenPipe
| ConnectionRefused
| ConnectionAborted
| ConnectionReset
| TimedOut,
)
}

View File

@@ -52,7 +52,7 @@ pub(crate) async fn hi(str: &[u8], salt: &[u8], iterations: u32) -> [u8; 32] {
}
// yield every ~250us
// hopefully reduces tail latencies
if i % 1024 == 0 {
if i.is_multiple_of(1024) {
yield_now().await
}
}

View File

@@ -90,7 +90,7 @@ pub struct InnerClient {
}
impl InnerClient {
pub fn start(&mut self) -> Result<PartialQuery, Error> {
pub fn start(&mut self) -> Result<PartialQuery<'_>, Error> {
self.responses.waiting += 1;
Ok(PartialQuery(Some(self)))
}
@@ -227,7 +227,7 @@ impl Client {
&mut self,
statement: &str,
params: I,
) -> Result<RowStream, Error>
) -> Result<RowStream<'_>, Error>
where
S: AsRef<str>,
I: IntoIterator<Item = Option<S>>,
@@ -262,7 +262,7 @@ impl Client {
pub(crate) async fn simple_query_raw(
&mut self,
query: &str,
) -> Result<SimpleQueryStream, Error> {
) -> Result<SimpleQueryStream<'_>, Error> {
simple_query::simple_query(self.inner_mut(), query).await
}

View File

@@ -12,7 +12,11 @@ mod private {
/// This trait is "sealed", and cannot be implemented outside of this crate.
pub trait GenericClient: private::Sealed {
/// Like `Client::query_raw_txt`.
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
async fn query_raw_txt<S, I>(
&mut self,
statement: &str,
params: I,
) -> Result<RowStream<'_>, Error>
where
S: AsRef<str> + Sync + Send,
I: IntoIterator<Item = Option<S>> + Sync + Send,
@@ -22,7 +26,11 @@ pub trait GenericClient: private::Sealed {
impl private::Sealed for Client {}
impl GenericClient for Client {
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
async fn query_raw_txt<S, I>(
&mut self,
statement: &str,
params: I,
) -> Result<RowStream<'_>, Error>
where
S: AsRef<str> + Sync + Send,
I: IntoIterator<Item = Option<S>> + Sync + Send,
@@ -35,7 +43,11 @@ impl GenericClient for Client {
impl private::Sealed for Transaction<'_> {}
impl GenericClient for Transaction<'_> {
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
async fn query_raw_txt<S, I>(
&mut self,
statement: &str,
params: I,
) -> Result<RowStream<'_>, Error>
where
S: AsRef<str> + Sync + Send,
I: IntoIterator<Item = Option<S>> + Sync + Send,

View File

@@ -47,7 +47,7 @@ impl<'a> Transaction<'a> {
&mut self,
statement: &str,
params: I,
) -> Result<RowStream, Error>
) -> Result<RowStream<'_>, Error>
where
S: AsRef<str>,
I: IntoIterator<Item = Option<S>>,

View File

@@ -24,12 +24,28 @@ macro_rules! critical {
if cfg!(debug_assertions) {
panic!($($arg)*);
}
// Increment both metrics
$crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
let backtrace = std::backtrace::Backtrace::capture();
tracing::error!("CRITICAL: {}\n{backtrace}", format!($($arg)*));
}};
}
#[macro_export]
macro_rules! critical_timeline {
($tenant_shard_id:expr, $timeline_id:expr, $($arg:tt)*) => {{
if cfg!(debug_assertions) {
panic!($($arg)*);
}
// Increment both metrics
$crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
$crate::logging::HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC.inc(&$tenant_shard_id.to_string(), &$timeline_id.to_string());
let backtrace = std::backtrace::Backtrace::capture();
tracing::error!("CRITICAL: [tenant_shard_id: {}, timeline_id: {}] {}\n{backtrace}",
$tenant_shard_id, $timeline_id, format!($($arg)*));
}};
}
#[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum LogFormat {
@@ -61,6 +77,36 @@ pub struct TracingEventCountMetric {
trace: IntCounter,
}
// Begin Hadron: Add a HadronCriticalStorageEventCountMetric metric that is sliced by tenant_id and timeline_id
pub struct HadronCriticalStorageEventCountMetric {
critical: IntCounterVec,
}
pub static HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC: Lazy<HadronCriticalStorageEventCountMetric> =
Lazy::new(|| {
let vec = metrics::register_int_counter_vec!(
"hadron_critical_storage_event_count",
"Number of critical storage events, by tenant_id and timeline_id",
&["tenant_shard_id", "timeline_id"]
)
.expect("failed to define metric");
HadronCriticalStorageEventCountMetric::new(vec)
});
impl HadronCriticalStorageEventCountMetric {
fn new(vec: IntCounterVec) -> Self {
Self { critical: vec }
}
// Allow public access from `critical!` macro.
pub fn inc(&self, tenant_shard_id: &str, timeline_id: &str) {
self.critical
.with_label_values(&[tenant_shard_id, timeline_id])
.inc();
}
}
// End Hadron
pub static TRACING_EVENT_COUNT_METRIC: Lazy<TracingEventCountMetric> = Lazy::new(|| {
let vec = metrics::register_int_counter_vec!(
"libmetrics_tracing_event_count",

View File

@@ -78,7 +78,7 @@ use utils::rate_limit::RateLimit;
use utils::seqwait::SeqWait;
use utils::simple_rcu::{Rcu, RcuReadGuard};
use utils::sync::gate::{Gate, GateGuard};
use utils::{completion, critical, fs_ext, pausable_failpoint};
use utils::{completion, critical_timeline, fs_ext, pausable_failpoint};
#[cfg(test)]
use wal_decoder::models::value::Value;
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
@@ -4729,7 +4729,7 @@ impl Timeline {
}
// Fetch the next layer to flush, if any.
let (layer, l0_count, frozen_count, frozen_size) = {
let (layer, l0_count, frozen_count, frozen_size, open_layer_size) = {
let layers = self.layers.read(LayerManagerLockHolder::FlushLoop).await;
let Ok(lm) = layers.layer_map() else {
info!("dropping out of flush loop for timeline shutdown");
@@ -4742,8 +4742,13 @@ impl Timeline {
.iter()
.map(|l| l.estimated_in_mem_size())
.sum();
let open_layer_size: u64 = lm
.open_layer
.as_ref()
.map(|l| l.estimated_in_mem_size())
.unwrap_or(0);
let layer = lm.frozen_layers.front().cloned();
(layer, l0_count, frozen_count, frozen_size)
(layer, l0_count, frozen_count, frozen_size, open_layer_size)
// drop 'layers' lock
};
let Some(layer) = layer else {
@@ -4756,7 +4761,7 @@ impl Timeline {
if l0_count >= stall_threshold {
warn!(
"stalling layer flushes for compaction backpressure at {l0_count} \
L0 layers ({frozen_count} frozen layers with {frozen_size} bytes)"
L0 layers ({frozen_count} frozen layers with {frozen_size} bytes, {open_layer_size} bytes in open layer)"
);
let stall_timer = self
.metrics
@@ -4809,7 +4814,7 @@ impl Timeline {
let delay = flush_duration.as_secs_f64();
info!(
"delaying layer flush by {delay:.3}s for compaction backpressure at \
{l0_count} L0 layers ({frozen_count} frozen layers with {frozen_size} bytes)"
{l0_count} L0 layers ({frozen_count} frozen layers with {frozen_size} bytes, {open_layer_size} bytes in open layer)"
);
let _delay_timer = self
.metrics
@@ -6819,7 +6824,11 @@ impl Timeline {
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
Err(walredo::Error::Other(err)) => {
if fire_critical_error {
critical!("walredo failure during page reconstruction: {err:?}");
critical_timeline!(
self.tenant_shard_id,
self.timeline_id,
"walredo failure during page reconstruction: {err:?}"
);
}
return Err(PageReconstructError::WalRedo(
err.context("reconstruct a page image"),

View File

@@ -36,7 +36,7 @@ use serde::Serialize;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, info_span, trace, warn};
use utils::critical;
use utils::critical_timeline;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
@@ -1390,7 +1390,11 @@ impl Timeline {
GetVectoredError::MissingKey(_),
) = err
{
critical!("missing key during compaction: {err:?}");
critical_timeline!(
self.tenant_shard_id,
self.timeline_id,
"missing key during compaction: {err:?}"
);
}
})?;
@@ -1418,7 +1422,11 @@ impl Timeline {
// Alert on critical errors that indicate data corruption.
Err(err) if err.is_critical() => {
critical!("could not compact, repartitioning keyspace failed: {err:?}");
critical_timeline!(
self.tenant_shard_id,
self.timeline_id,
"could not compact, repartitioning keyspace failed: {err:?}"
);
}
// Log other errors. No partitioning? This is normal, if the timeline was just created

View File

@@ -182,6 +182,7 @@ pub(crate) async fn generate_tombstone_image_layer(
detached: &Arc<Timeline>,
ancestor: &Arc<Timeline>,
ancestor_lsn: Lsn,
historic_layers_to_copy: &Vec<Layer>,
ctx: &RequestContext,
) -> Result<Option<ResidentLayer>, Error> {
tracing::info!(
@@ -199,6 +200,20 @@ pub(crate) async fn generate_tombstone_image_layer(
let image_lsn = ancestor_lsn;
{
for layer in historic_layers_to_copy {
let desc = layer.layer_desc();
if !desc.is_delta
&& desc.lsn_range.start == image_lsn
&& overlaps_with(&key_range, &desc.key_range)
{
tracing::info!(
layer=%layer, "will copy tombstone from ancestor instead of creating a new one"
);
return Ok(None);
}
}
let layers = detached
.layers
.read(LayerManagerLockHolder::DetachAncestor)
@@ -450,7 +465,8 @@ pub(super) async fn prepare(
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len() + 1);
if let Some(tombstone_layer) =
generate_tombstone_image_layer(detached, &ancestor, ancestor_lsn, ctx).await?
generate_tombstone_image_layer(detached, &ancestor, ancestor_lsn, &rest_of_historic, ctx)
.await?
{
new_layers.push(tombstone_layer.into());
}

View File

@@ -25,7 +25,7 @@ use tokio_postgres::replication::ReplicationStream;
use tokio_postgres::{Client, SimpleQueryMessage, SimpleQueryRow};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, trace, warn};
use utils::critical;
use utils::critical_timeline;
use utils::id::NodeId;
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
@@ -368,9 +368,13 @@ pub(super) async fn handle_walreceiver_connection(
match raw_wal_start_lsn.cmp(&expected_wal_start) {
std::cmp::Ordering::Greater => {
let msg = format!(
"Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn})"
"Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn}"
);
critical_timeline!(
timeline.tenant_shard_id,
timeline.timeline_id,
"{msg}"
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
std::cmp::Ordering::Less => {
@@ -383,7 +387,11 @@ pub(super) async fn handle_walreceiver_connection(
"Received record with next_record_lsn multiple times ({} < {})",
first_rec.next_record_lsn, expected_wal_start
);
critical!("{msg}");
critical_timeline!(
timeline.tenant_shard_id,
timeline.timeline_id,
"{msg}"
);
return Err(WalReceiverError::Other(anyhow!(msg)));
}
}
@@ -452,7 +460,11 @@ pub(super) async fn handle_walreceiver_connection(
// TODO: we can't differentiate cancellation errors with
// anyhow::Error, so just ignore it if we're cancelled.
if !cancellation.is_cancelled() && !timeline.is_stopping() {
critical!("{err:?}")
critical_timeline!(
timeline.tenant_shard_id,
timeline.timeline_id,
"{err:?}"
);
}
})?;

View File

@@ -40,7 +40,7 @@ use tracing::*;
use utils::bin_ser::{DeserializeError, SerializeError};
use utils::lsn::Lsn;
use utils::rate_limit::RateLimit;
use utils::{critical, failpoint_support};
use utils::{critical_timeline, failpoint_support};
use wal_decoder::models::record::NeonWalRecord;
use wal_decoder::models::*;
@@ -418,18 +418,30 @@ impl WalIngest {
// as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
// https://github.com/neondatabase/neon/pull/10634.
let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
critical!("clear_vm_bits for unknown VM relation {vm_rel}");
critical_timeline!(
modification.tline.tenant_shard_id,
modification.tline.timeline_id,
"clear_vm_bits for unknown VM relation {vm_rel}"
);
return Ok(());
};
if let Some(blknum) = new_vm_blk {
if blknum >= vm_size {
critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
critical_timeline!(
modification.tline.tenant_shard_id,
modification.tline.timeline_id,
"new_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
);
new_vm_blk = None;
}
}
if let Some(blknum) = old_vm_blk {
if blknum >= vm_size {
critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
critical_timeline!(
modification.tline.tenant_shard_id,
modification.tline.timeline_id,
"old_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
);
old_vm_blk = None;
}
}

View File

@@ -87,6 +87,14 @@ static const struct config_enum_entry running_xacts_overflow_policies[] = {
{NULL, 0, false}
};
static const struct config_enum_entry debug_compare_local_modes[] = {
{"none", DEBUG_COMPARE_LOCAL_NONE, false},
{"prefetch", DEBUG_COMPARE_LOCAL_PREFETCH, false},
{"lfc", DEBUG_COMPARE_LOCAL_LFC, false},
{"all", DEBUG_COMPARE_LOCAL_ALL, false},
{NULL, 0, false}
};
/*
* XXX: These private to procarray.c, but we need them here.
*/
@@ -519,6 +527,16 @@ _PG_init(void)
GUC_UNIT_KB,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"neon.debug_compare_local",
"Debug mode for compaing content of pages in prefetch ring/LFC/PS and local disk",
NULL,
&debug_compare_local,
DEBUG_COMPARE_LOCAL_NONE,
debug_compare_local_modes,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
/*
* Important: This must happen after other parts of the extension are
* loaded, otherwise any settings to GUCs that were set before the

View File

@@ -177,6 +177,22 @@ extern StringInfoData nm_pack_request(NeonRequest *msg);
extern NeonResponse *nm_unpack_response(StringInfo s);
extern char *nm_to_string(NeonMessage *msg);
/*
* If debug_compare_local>DEBUG_COMPARE_LOCAL_NONE, we pass through all the SMGR API
* calls to md.c, and *also* do the calls to the Page Server. On every
* read, compare the versions we read from local disk and Page Server,
* and Assert that they are identical.
*/
typedef enum
{
DEBUG_COMPARE_LOCAL_NONE, /* normal mode - pages are storted locally only for unlogged relations */
DEBUG_COMPARE_LOCAL_PREFETCH, /* if page is found in prefetch ring, then compare it with local and return */
DEBUG_COMPARE_LOCAL_LFC, /* if page is found in LFC or prefetch ring, then compare it with local and return */
DEBUG_COMPARE_LOCAL_ALL /* always fetch page from PS and compare it with local */
} DebugCompareLocalMode;
extern int debug_compare_local;
/*
* API
*/

View File

@@ -76,21 +76,11 @@
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
/*
* If DEBUG_COMPARE_LOCAL is defined, we pass through all the SMGR API
* calls to md.c, and *also* do the calls to the Page Server. On every
* read, compare the versions we read from local disk and Page Server,
* and Assert that they are identical.
*/
/* #define DEBUG_COMPARE_LOCAL */
#ifdef DEBUG_COMPARE_LOCAL
#include "access/nbtree.h"
#include "storage/bufpage.h"
#include "access/xlog_internal.h"
static char *hexdump_page(char *page);
#endif
#define IS_LOCAL_REL(reln) (\
NInfoGetDbOid(InfoFromSMgrRel(reln)) != 0 && \
@@ -108,6 +98,8 @@ typedef enum
UNLOGGED_BUILD_NOT_PERMANENT
} UnloggedBuildPhase;
int debug_compare_local;
static NRelFileInfo unlogged_build_rel_info;
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
@@ -478,9 +470,10 @@ neon_init(void)
old_redo_read_buffer_filter = redo_read_buffer_filter;
redo_read_buffer_filter = neon_redo_read_buffer_filter;
#ifdef DEBUG_COMPARE_LOCAL
mdinit();
#endif
if (debug_compare_local)
{
mdinit();
}
}
/*
@@ -803,13 +796,16 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
#ifdef DEBUG_COMPARE_LOCAL
mdcreate(reln, forkNum, forkNum == INIT_FORKNUM || isRedo);
if (forkNum == MAIN_FORKNUM)
mdcreate(reln, INIT_FORKNUM, true);
#else
mdcreate(reln, forkNum, isRedo);
#endif
if (debug_compare_local)
{
mdcreate(reln, forkNum, forkNum == INIT_FORKNUM || isRedo);
if (forkNum == MAIN_FORKNUM)
mdcreate(reln, INIT_FORKNUM, true);
}
else
{
mdcreate(reln, forkNum, isRedo);
}
return;
default:
@@ -848,10 +844,11 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
else
set_cached_relsize(InfoFromSMgrRel(reln), forkNum, 0);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdcreate(reln, forkNum, isRedo);
#endif
if (debug_compare_local)
{
if (IS_LOCAL_REL(reln))
mdcreate(reln, forkNum, isRedo);
}
}
/*
@@ -877,7 +874,7 @@ neon_unlink(NRelFileInfoBackend rinfo, ForkNumber forkNum, bool isRedo)
{
/*
* Might or might not exist locally, depending on whether it's an unlogged
* or permanent relation (or if DEBUG_COMPARE_LOCAL is set). Try to
* or permanent relation (or if debug_compare_local is set). Try to
* unlink, it won't do any harm if the file doesn't exist.
*/
mdunlink(rinfo, forkNum, isRedo);
@@ -973,10 +970,11 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
lfc_write(InfoFromSMgrRel(reln), forkNum, blkno, buffer);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdextend(reln, forkNum, blkno, buffer, skipFsync);
#endif
if (debug_compare_local)
{
if (IS_LOCAL_REL(reln))
mdextend(reln, forkNum, blkno, buffer, skipFsync);
}
/*
* smgr_extend is often called with an all-zeroes page, so
@@ -1051,10 +1049,11 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
relpath(reln->smgr_rlocator, forkNum),
InvalidBlockNumber)));
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
#endif
if (debug_compare_local)
{
if (IS_LOCAL_REL(reln))
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
}
/* Don't log any pages if we're not allowed to do so. */
if (!XLogInsertAllowed())
@@ -1265,10 +1264,11 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdwriteback(reln, forknum, blocknum, nblocks);
#endif
if (debug_compare_local)
{
if (IS_LOCAL_REL(reln))
mdwriteback(reln, forknum, blocknum, nblocks);
}
}
/*
@@ -1282,7 +1282,6 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
communicator_read_at_lsnv(rinfo, forkNum, blkno, &request_lsns, &buffer, 1, NULL);
}
#ifdef DEBUG_COMPARE_LOCAL
static void
compare_with_local(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void* buffer, XLogRecPtr request_lsn)
{
@@ -1364,7 +1363,6 @@ compare_with_local(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, voi
}
}
}
#endif
#if PG_MAJORVERSION_NUM < 17
@@ -1417,22 +1415,28 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
if (communicator_prefetch_lookupv(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, &bufferp, &present))
{
/* Prefetch hit */
#ifdef DEBUG_COMPARE_LOCAL
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
#else
return;
#endif
if (debug_compare_local >= DEBUG_COMPARE_LOCAL_PREFETCH)
{
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
}
if (debug_compare_local <= DEBUG_COMPARE_LOCAL_PREFETCH)
{
return;
}
}
/* Try to read from local file cache */
if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer))
{
MyNeonCounters->file_cache_hits_total++;
#ifdef DEBUG_COMPARE_LOCAL
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
#else
return;
#endif
if (debug_compare_local >= DEBUG_COMPARE_LOCAL_LFC)
{
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
}
if (debug_compare_local <= DEBUG_COMPARE_LOCAL_LFC)
{
return;
}
}
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
@@ -1442,15 +1446,15 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
*/
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
#endif
if (debug_compare_local)
{
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
}
}
#endif /* PG_MAJORVERSION_NUM <= 16 */
#if PG_MAJORVERSION_NUM >= 17
#ifdef DEBUG_COMPARE_LOCAL
static void
compare_with_localv(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void** buffers, BlockNumber nblocks, neon_request_lsns* request_lsns, bits8* read_pages)
{
@@ -1465,7 +1469,6 @@ compare_with_localv(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, vo
}
}
}
#endif
static void
@@ -1516,13 +1519,19 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
blocknum, request_lsns, nblocks,
buffers, read_pages);
#ifdef DEBUG_COMPARE_LOCAL
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
memset(read_pages, 0, sizeof(read_pages));
#else
if (prefetch_result == nblocks)
if (debug_compare_local >= DEBUG_COMPARE_LOCAL_PREFETCH)
{
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
}
if (debug_compare_local <= DEBUG_COMPARE_LOCAL_PREFETCH && prefetch_result == nblocks)
{
return;
#endif
}
if (debug_compare_local > DEBUG_COMPARE_LOCAL_PREFETCH)
{
memset(read_pages, 0, sizeof(read_pages));
}
/* Try to read from local file cache */
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
@@ -1531,14 +1540,19 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
if (lfc_result > 0)
MyNeonCounters->file_cache_hits_total += lfc_result;
#ifdef DEBUG_COMPARE_LOCAL
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
memset(read_pages, 0, sizeof(read_pages));
#else
/* Read all blocks from LFC, so we're done */
if (prefetch_result + lfc_result == nblocks)
if (debug_compare_local >= DEBUG_COMPARE_LOCAL_LFC)
{
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
}
if (debug_compare_local <= DEBUG_COMPARE_LOCAL_LFC && prefetch_result + lfc_result == nblocks)
{
/* Read all blocks from LFC, so we're done */
return;
#endif
}
if (debug_compare_local > DEBUG_COMPARE_LOCAL_LFC)
{
memset(read_pages, 0, sizeof(read_pages));
}
communicator_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
buffers, nblocks, read_pages);
@@ -1548,14 +1562,14 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
*/
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
memset(read_pages, 0xFF, sizeof(read_pages));
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
#endif
if (debug_compare_local)
{
memset(read_pages, 0xFF, sizeof(read_pages));
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
}
}
#endif
#ifdef DEBUG_COMPARE_LOCAL
static char *
hexdump_page(char *page)
{
@@ -1574,7 +1588,6 @@ hexdump_page(char *page)
return result.data;
}
#endif
#if PG_MAJORVERSION_NUM < 17
/*
@@ -1596,12 +1609,8 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
switch (reln->smgr_relpersistence)
{
case 0:
#ifndef DEBUG_COMPARE_LOCAL
/* This is a bit tricky. Check if the relation exists locally */
if (mdexists(reln, forknum))
#else
if (mdexists(reln, INIT_FORKNUM))
#endif
if (mdexists(reln, debug_compare_local ? INIT_FORKNUM : forknum))
{
/* It exists locally. Guess it's unlogged then. */
#if PG_MAJORVERSION_NUM >= 17
@@ -1656,14 +1665,17 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
if (debug_compare_local)
{
if (IS_LOCAL_REL(reln))
{
#if PG_MAJORVERSION_NUM >= 17
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
#else
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
#endif
#endif
}
}
}
#endif
@@ -1677,12 +1689,8 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
switch (reln->smgr_relpersistence)
{
case 0:
#ifndef DEBUG_COMPARE_LOCAL
/* This is a bit tricky. Check if the relation exists locally */
if (mdexists(reln, forknum))
#else
if (mdexists(reln, INIT_FORKNUM))
#endif
if (mdexists(reln, debug_compare_local ? INIT_FORKNUM : forknum))
{
/* It exists locally. Guess it's unlogged then. */
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
@@ -1720,10 +1728,11 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
#endif
if (debug_compare_local)
{
if (IS_LOCAL_REL(reln))
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
}
}
#endif
@@ -1862,10 +1871,11 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
*/
neon_set_lwlsn_relation(lsn, InfoFromSMgrRel(reln), forknum);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdtruncate(reln, forknum, old_blocks, nblocks);
#endif
if (debug_compare_local)
{
if (IS_LOCAL_REL(reln))
mdtruncate(reln, forknum, old_blocks, nblocks);
}
}
/*
@@ -1904,10 +1914,11 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdimmedsync(reln, forknum);
#endif
if (debug_compare_local)
{
if (IS_LOCAL_REL(reln))
mdimmedsync(reln, forknum);
}
}
#if PG_MAJORVERSION_NUM >= 17
@@ -1934,10 +1945,11 @@ neon_registersync(SMgrRelation reln, ForkNumber forknum)
neon_log(SmgrTrace, "[NEON_SMGR] registersync noop");
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdimmedsync(reln, forknum);
#endif
if (debug_compare_local)
{
if (IS_LOCAL_REL(reln))
mdimmedsync(reln, forknum);
}
}
#endif
@@ -1978,10 +1990,11 @@ neon_start_unlogged_build(SMgrRelation reln)
case RELPERSISTENCE_UNLOGGED:
unlogged_build_rel_info = InfoFromSMgrRel(reln);
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
#ifdef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
mdcreate(reln, INIT_FORKNUM, true);
#endif
if (debug_compare_local)
{
if (!IsParallelWorker())
mdcreate(reln, INIT_FORKNUM, true);
}
return;
default:
@@ -2009,11 +2022,7 @@ neon_start_unlogged_build(SMgrRelation reln)
*/
if (!IsParallelWorker())
{
#ifndef DEBUG_COMPARE_LOCAL
mdcreate(reln, MAIN_FORKNUM, false);
#else
mdcreate(reln, INIT_FORKNUM, true);
#endif
mdcreate(reln, debug_compare_local ? INIT_FORKNUM : MAIN_FORKNUM, false);
}
}
@@ -2107,14 +2116,14 @@ neon_end_unlogged_build(SMgrRelation reln)
lfc_invalidate(InfoFromNInfoB(rinfob), forknum, nblocks);
mdclose(reln, forknum);
#ifndef DEBUG_COMPARE_LOCAL
/* use isRedo == true, so that we drop it immediately */
mdunlink(rinfob, forknum, true);
#endif
if (!debug_compare_local)
{
/* use isRedo == true, so that we drop it immediately */
mdunlink(rinfob, forknum, true);
}
}
#ifdef DEBUG_COMPARE_LOCAL
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
if (debug_compare_local)
mdunlink(rinfob, INIT_FORKNUM, true);
}
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;

View File

@@ -164,21 +164,20 @@ async fn authenticate(
})?
.map_err(ConsoleRedirectError::from)?;
if auth_config.ip_allowlist_check_enabled {
if let Some(allowed_ips) = &db_info.allowed_ips {
if !auth::check_peer_addr_is_in_list(&ctx.peer_addr(), allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
}
if auth_config.ip_allowlist_check_enabled
&& let Some(allowed_ips) = &db_info.allowed_ips
&& !auth::check_peer_addr_is_in_list(&ctx.peer_addr(), allowed_ips)
{
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
// Check if the access over the public internet is allowed, otherwise block. Note that
// the console redirect is not behind the VPC service endpoint, so we don't need to check
// the VPC endpoint ID.
if let Some(public_access_allowed) = db_info.public_access_allowed {
if !public_access_allowed {
return Err(auth::AuthError::NetworkNotAllowed);
}
if let Some(public_access_allowed) = db_info.public_access_allowed
&& !public_access_allowed
{
return Err(auth::AuthError::NetworkNotAllowed);
}
client.write_message(BeMessage::NoticeResponse("Connecting to database."));

View File

@@ -399,36 +399,36 @@ impl JwkCacheEntryLock {
tracing::debug!(?payload, "JWT signature valid with claims");
if let Some(aud) = expected_audience {
if payload.audience.0.iter().all(|s| s != aud) {
return Err(JwtError::InvalidClaims(
JwtClaimsError::InvalidJwtTokenAudience,
));
}
if let Some(aud) = expected_audience
&& payload.audience.0.iter().all(|s| s != aud)
{
return Err(JwtError::InvalidClaims(
JwtClaimsError::InvalidJwtTokenAudience,
));
}
let now = SystemTime::now();
if let Some(exp) = payload.expiration {
if now >= exp + CLOCK_SKEW_LEEWAY {
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired(
exp.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
)));
}
if let Some(exp) = payload.expiration
&& now >= exp + CLOCK_SKEW_LEEWAY
{
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired(
exp.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
)));
}
if let Some(nbf) = payload.not_before {
if nbf >= now + CLOCK_SKEW_LEEWAY {
return Err(JwtError::InvalidClaims(
JwtClaimsError::JwtTokenNotYetReadyToUse(
nbf.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
),
));
}
if let Some(nbf) = payload.not_before
&& nbf >= now + CLOCK_SKEW_LEEWAY
{
return Err(JwtError::InvalidClaims(
JwtClaimsError::JwtTokenNotYetReadyToUse(
nbf.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
),
));
}
Ok(ComputeCredentialKeys::JwtPayload(payloadb))

View File

@@ -345,15 +345,13 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
Err(e) => {
// The password could have been changed, so we invalidate the cache.
// We should only invalidate the cache if the TTL might have expired.
if e.is_password_failed() {
#[allow(irrefutable_let_patterns)]
if let ControlPlaneClient::ProxyV1(api) = &*api {
if let Some(ep) = &user_info.endpoint_id {
api.caches
.project_info
.maybe_invalidate_role_secret(ep, &user_info.user);
}
}
if e.is_password_failed()
&& let ControlPlaneClient::ProxyV1(api) = &*api
&& let Some(ep) = &user_info.endpoint_id
{
api.caches
.project_info
.maybe_invalidate_role_secret(ep, &user_info.user);
}
Err(e)

View File

@@ -7,9 +7,7 @@ use anyhow::bail;
use arc_swap::ArcSwapOption;
use camino::Utf8PathBuf;
use clap::Parser;
use futures::future::Either;
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::task::JoinSet;
@@ -22,11 +20,12 @@ use crate::auth::backend::jwt::JwkCache;
use crate::auth::backend::local::LocalBackend;
use crate::auth::{self};
use crate::cancellation::CancellationHandler;
#[cfg(feature = "rest_broker")]
use crate::config::RestConfig;
use crate::config::refresh_config_loop;
use crate::config::{
self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig,
refresh_config_loop,
};
use crate::control_plane::locks::ApiLocks;
use crate::http::health_server::AppMetrics;

View File

@@ -10,11 +10,15 @@ use std::time::Duration;
use anyhow::Context;
use anyhow::{bail, ensure};
use arc_swap::ArcSwapOption;
#[cfg(any(test, feature = "testing"))]
use camino::Utf8PathBuf;
use futures::future::Either;
use itertools::{Itertools, Position};
use rand::{Rng, thread_rng};
use remote_storage::RemoteStorageConfig;
use tokio::net::TcpListener;
#[cfg(any(test, feature = "testing"))]
use tokio::sync::Notify;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, error, info, warn};
@@ -51,10 +55,6 @@ use crate::tls::client_config::compute_client_config_with_root_certs;
#[cfg(any(test, feature = "testing"))]
use crate::url::ApiUrl;
use crate::{auth, control_plane, http, serverless, usage_metrics};
#[cfg(any(test, feature = "testing"))]
use camino::Utf8PathBuf;
#[cfg(any(test, feature = "testing"))]
use tokio::sync::Notify;
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
@@ -537,54 +537,51 @@ pub async fn run() -> anyhow::Result<()> {
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
}
#[cfg_attr(not(any(test, feature = "testing")), expect(irrefutable_let_patterns))]
if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend {
if let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api {
if let Some(client) = redis_client {
// project info cache and invalidation of that cache.
let cache = api.caches.project_info.clone();
maintenance_tasks.spawn(notifications::task_main(client.clone(), cache.clone()));
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend
&& let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api
&& let Some(client) = redis_client
{
// project info cache and invalidation of that cache.
let cache = api.caches.project_info.clone();
maintenance_tasks.spawn(notifications::task_main(client.clone(), cache.clone()));
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
// Try to connect to Redis 3 times with 1 + (0..0.1) second interval.
// This prevents immediate exit and pod restart,
// which can cause hammering of the redis in case of connection issues.
// cancellation key management
let mut redis_kv_client = RedisKVClient::new(client.clone());
for attempt in (0..3).with_position() {
match redis_kv_client.try_connect().await {
Ok(()) => {
info!("Connected to Redis KV client");
cancellation_handler.init_tx(BatchQueue::new(CancellationProcessor {
client: redis_kv_client,
batch_size: args.cancellation_batch_size,
}));
// Try to connect to Redis 3 times with 1 + (0..0.1) second interval.
// This prevents immediate exit and pod restart,
// which can cause hammering of the redis in case of connection issues.
// cancellation key management
let mut redis_kv_client = RedisKVClient::new(client.clone());
for attempt in (0..3).with_position() {
match redis_kv_client.try_connect().await {
Ok(()) => {
info!("Connected to Redis KV client");
cancellation_handler.init_tx(BatchQueue::new(CancellationProcessor {
client: redis_kv_client,
batch_size: args.cancellation_batch_size,
}));
break;
}
Err(e) => {
error!("Failed to connect to Redis KV client: {e}");
if matches!(attempt, Position::Last(_)) {
bail!(
"Failed to connect to Redis KV client after {} attempts",
attempt.into_inner()
);
}
let jitter = thread_rng().gen_range(0..100);
tokio::time::sleep(Duration::from_millis(1000 + jitter)).await;
}
}
break;
}
Err(e) => {
error!("Failed to connect to Redis KV client: {e}");
if matches!(attempt, Position::Last(_)) {
bail!(
"Failed to connect to Redis KV client after {} attempts",
attempt.into_inner()
);
}
let jitter = thread_rng().gen_range(0..100);
tokio::time::sleep(Duration::from_millis(1000 + jitter)).await;
}
// listen for notifications of new projects/endpoints/branches
let cache = api.caches.endpoints_cache.clone();
let span = tracing::info_span!("endpoints_cache");
maintenance_tasks.spawn(
async move { cache.do_read(client, cancellation_token.clone()).await }
.instrument(span),
);
}
}
// listen for notifications of new projects/endpoints/branches
let cache = api.caches.endpoints_cache.clone();
let span = tracing::info_span!("endpoints_cache");
maintenance_tasks.spawn(
async move { cache.do_read(client, cancellation_token.clone()).await }.instrument(span),
);
}
let maintenance = loop {

View File

@@ -4,11 +4,20 @@ use std::time::Duration;
use anyhow::{Context, Ok, bail, ensure};
use arc_swap::ArcSwapOption;
use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
use compute_api::spec::LocalProxySpec;
use remote_storage::RemoteStorageConfig;
use thiserror::Error;
use tokio::sync::Notify;
use tracing::{debug, error, info, warn};
use crate::auth::backend::jwt::JwkCache;
use crate::auth::backend::local::JWKS_ROLE_MAP;
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::messages::{EndpointJwksResponse, JwksSettings};
use crate::ext::TaskExt;
use crate::intern::RoleNameInt;
use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig};
use crate::scram::threadpool::ThreadPool;
use crate::serverless::GlobalConnPoolOptions;
@@ -16,18 +25,7 @@ use crate::serverless::cancel_set::CancelSet;
#[cfg(feature = "rest_broker")]
use crate::serverless::rest::DbSchemaCache;
pub use crate::tls::server_config::{TlsConfig, configure_tls};
use crate::types::Host;
use crate::auth::backend::local::JWKS_ROLE_MAP;
use crate::control_plane::messages::{EndpointJwksResponse, JwksSettings};
use crate::ext::TaskExt;
use crate::intern::RoleNameInt;
use crate::types::RoleName;
use camino::{Utf8Path, Utf8PathBuf};
use compute_api::spec::LocalProxySpec;
use thiserror::Error;
use tokio::sync::Notify;
use tracing::{debug, error, info, warn};
use crate::types::{Host, RoleName};
pub struct ProxyConfig {
pub tls_config: ArcSwapOption<TlsConfig>,

View File

@@ -209,11 +209,9 @@ impl RequestContext {
if let Some(options_str) = options.get("options") {
// If not found directly, try to extract it from the options string
for option in options_str.split_whitespace() {
if option.starts_with("neon_query_id:") {
if let Some(value) = option.strip_prefix("neon_query_id:") {
this.set_testodrome_id(value.into());
break;
}
if let Some(value) = option.strip_prefix("neon_query_id:") {
this.set_testodrome_id(value.into());
break;
}
}
}

View File

@@ -250,10 +250,8 @@ impl NeonControlPlaneClient {
info!(duration = ?start.elapsed(), "received http response");
let body = parse_body::<WakeCompute>(response.status(), response.bytes().await?)?;
// Unfortunately, ownership won't let us use `Option::ok_or` here.
let (host, port) = match parse_host_port(&body.address) {
None => return Err(WakeComputeError::BadComputeAddress(body.address)),
Some(x) => x,
let Some((host, port)) = parse_host_port(&body.address) else {
return Err(WakeComputeError::BadComputeAddress(body.address));
};
let host_addr = IpAddr::from_str(host).ok();

View File

@@ -271,18 +271,18 @@ where
});
// In case logging fails we generate a simpler JSON object.
if let Err(err) = res {
if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
if let Err(err) = res
&& let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
"timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
"level": "ERROR",
"message": format_args!("cannot log event: {err:?}"),
"fields": {
"event": format_args!("{event:?}"),
},
})) {
line.push(b'\n');
self.writer.make_writer().write_all(&line).ok();
}
}))
{
line.push(b'\n');
self.writer.make_writer().write_all(&line).ok();
}
}
@@ -583,10 +583,11 @@ impl EventFormatter {
THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
// TODO: tls cache? name could change
if let Some(thread_name) = std::thread::current().name() {
if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
serializer.serialize_entry("thread_name", thread_name)?;
}
if let Some(thread_name) = std::thread::current().name()
&& !thread_name.is_empty()
&& thread_name != "tokio-runtime-worker"
{
serializer.serialize_entry("thread_name", thread_name)?;
}
if let Some(task_id) = tokio::task::try_id() {
@@ -596,10 +597,10 @@ impl EventFormatter {
serializer.serialize_entry("target", meta.target())?;
// Skip adding module if it's the same as target.
if let Some(module) = meta.module_path() {
if module != meta.target() {
serializer.serialize_entry("module", module)?;
}
if let Some(module) = meta.module_path()
&& module != meta.target()
{
serializer.serialize_entry("module", module)?;
}
if let Some(file) = meta.file() {

View File

@@ -236,13 +236,6 @@ pub enum Bool {
False,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "outcome")]
pub enum Outcome {
Success,
Failed,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "outcome")]
pub enum CacheOutcome {

View File

@@ -90,27 +90,27 @@ where
// TODO: 1 info log, with a enum label for close direction.
// Early termination checks from compute to client.
if let TransferState::Done(_) = compute_to_client {
if let TransferState::Running(buf) = &client_to_compute {
info!("Compute is done, terminate client");
// Initiate shutdown
client_to_compute = TransferState::ShuttingDown(buf.amt);
client_to_compute_result =
transfer_one_direction(cx, &mut client_to_compute, client, compute)
.map_err(ErrorSource::from_client)?;
}
if let TransferState::Done(_) = compute_to_client
&& let TransferState::Running(buf) = &client_to_compute
{
info!("Compute is done, terminate client");
// Initiate shutdown
client_to_compute = TransferState::ShuttingDown(buf.amt);
client_to_compute_result =
transfer_one_direction(cx, &mut client_to_compute, client, compute)
.map_err(ErrorSource::from_client)?;
}
// Early termination checks from client to compute.
if let TransferState::Done(_) = client_to_compute {
if let TransferState::Running(buf) = &compute_to_client {
info!("Client is done, terminate compute");
// Initiate shutdown
compute_to_client = TransferState::ShuttingDown(buf.amt);
compute_to_client_result =
transfer_one_direction(cx, &mut compute_to_client, compute, client)
.map_err(ErrorSource::from_compute)?;
}
if let TransferState::Done(_) = client_to_compute
&& let TransferState::Running(buf) = &compute_to_client
{
info!("Client is done, terminate compute");
// Initiate shutdown
compute_to_client = TransferState::ShuttingDown(buf.amt);
compute_to_client_result =
transfer_one_direction(cx, &mut compute_to_client, compute, client)
.map_err(ErrorSource::from_compute)?;
}
// It is not a problem if ready! returns early ... (comment remains the same)

View File

@@ -39,7 +39,11 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
let config = config.map_or(self.default_config, Into::into);
if self.access_count.fetch_add(1, Ordering::AcqRel) % 2048 == 0 {
if self
.access_count
.fetch_add(1, Ordering::AcqRel)
.is_multiple_of(2048)
{
self.do_gc(now);
}

View File

@@ -211,7 +211,11 @@ impl<K: Hash + Eq, R: Rng, S: BuildHasher + Clone> BucketRateLimiter<K, R, S> {
// worst case memory usage is about:
// = 2 * 2048 * 64 * (48B + 72B)
// = 30MB
if self.access_count.fetch_add(1, Ordering::AcqRel) % 2048 == 0 {
if self
.access_count
.fetch_add(1, Ordering::AcqRel)
.is_multiple_of(2048)
{
self.do_gc();
}

View File

@@ -1,79 +0,0 @@
use core::net::IpAddr;
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::pqproto::CancelKeyData;
pub trait CancellationPublisherMut: Send + Sync + 'static {
#[allow(async_fn_in_trait)]
async fn try_publish(
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()>;
}
pub trait CancellationPublisher: Send + Sync + 'static {
#[allow(async_fn_in_trait)]
async fn try_publish(
&self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()>;
}
impl CancellationPublisher for () {
async fn try_publish(
&self,
_cancel_key_data: CancelKeyData,
_session_id: Uuid,
_peer_addr: IpAddr,
) -> anyhow::Result<()> {
Ok(())
}
}
impl<P: CancellationPublisher> CancellationPublisherMut for P {
async fn try_publish(
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
<P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id, peer_addr)
.await
}
}
impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
async fn try_publish(
&self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
if let Some(p) = self {
p.try_publish(cancel_key_data, session_id, peer_addr).await
} else {
Ok(())
}
}
}
impl<P: CancellationPublisherMut> CancellationPublisher for Arc<Mutex<P>> {
async fn try_publish(
&self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<()> {
self.lock()
.await
.try_publish(cancel_key_data, session_id, peer_addr)
.await
}
}

View File

@@ -1,11 +1,11 @@
use std::convert::Infallible;
use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use futures::FutureExt;
use redis::aio::{ConnectionLike, MultiplexedConnection};
use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult};
use tokio::task::JoinHandle;
use tokio::task::AbortHandle;
use tracing::{error, info, warn};
use super::elasticache::CredentialsProvider;
@@ -32,7 +32,7 @@ pub struct ConnectionWithCredentialsProvider {
credentials: Credentials,
// TODO: with more load on the connection, we should consider using a connection pool
con: Option<MultiplexedConnection>,
refresh_token_task: Option<JoinHandle<Infallible>>,
refresh_token_task: Option<AbortHandle>,
mutex: tokio::sync::Mutex<()>,
credentials_refreshed: Arc<AtomicBool>,
}
@@ -127,7 +127,7 @@ impl ConnectionWithCredentialsProvider {
credentials_provider,
credentials_refreshed,
));
self.refresh_token_task = Some(f);
self.refresh_token_task = Some(f.abort_handle());
}
match Self::ping(&mut con).await {
Ok(()) => {
@@ -179,7 +179,7 @@ impl ConnectionWithCredentialsProvider {
mut con: MultiplexedConnection,
credentials_provider: Arc<CredentialsProvider>,
credentials_refreshed: Arc<AtomicBool>,
) -> Infallible {
) -> ! {
loop {
// The connection lives for 12h, for the sanity check we refresh it every hour.
tokio::time::sleep(Duration::from_secs(60 * 60)).await;
@@ -244,7 +244,7 @@ impl ConnectionLike for ConnectionWithCredentialsProvider {
&'a mut self,
cmd: &'a redis::Cmd,
) -> redis::RedisFuture<'a, redis::Value> {
(async move { self.send_packed_command(cmd).await }).boxed()
self.send_packed_command(cmd).boxed()
}
fn req_packed_commands<'a>(
@@ -253,10 +253,10 @@ impl ConnectionLike for ConnectionWithCredentialsProvider {
offset: usize,
count: usize,
) -> redis::RedisFuture<'a, Vec<redis::Value>> {
(async move { self.send_packed_commands(cmd, offset, count).await }).boxed()
self.send_packed_commands(cmd, offset, count).boxed()
}
fn get_db(&self) -> i64 {
0
self.con.as_ref().map_or(0, |c| c.get_db())
}
}

View File

@@ -1,4 +1,3 @@
pub mod cancellation_publisher;
pub mod connection_with_credentials_provider;
pub mod elasticache;
pub mod keys;

View File

@@ -54,9 +54,7 @@ impl<T: std::fmt::Display> ChannelBinding<T> {
"eSws".into()
}
Self::Required(mode) => {
use std::io::Write;
let mut cbind_input = vec![];
write!(&mut cbind_input, "p={mode},,",).unwrap();
let mut cbind_input = format!("p={mode},,",).into_bytes();
cbind_input.extend_from_slice(get_cbind_data(mode)?);
BASE64_STANDARD.encode(&cbind_input).into()
}

View File

@@ -107,7 +107,7 @@ pub(crate) async fn exchange(
secret: &ServerSecret,
password: &[u8],
) -> sasl::Result<sasl::Outcome<super::ScramKey>> {
let salt = BASE64_STANDARD.decode(&secret.salt_base64)?;
let salt = BASE64_STANDARD.decode(&*secret.salt_base64)?;
let client_key = derive_client_key(pool, endpoint, password, &salt, secret.iterations).await;
if secret.is_password_invalid(&client_key).into() {

View File

@@ -87,13 +87,20 @@ impl<'a> ClientFirstMessage<'a> {
salt_base64: &str,
iterations: u32,
) -> OwnedServerFirstMessage {
use std::fmt::Write;
let mut message = String::with_capacity(128);
message.push_str("r=");
let mut message = String::new();
write!(&mut message, "r={}", self.nonce).unwrap();
// write combined nonce
let combined_nonce_start = message.len();
message.push_str(self.nonce);
BASE64_STANDARD.encode_string(nonce, &mut message);
let combined_nonce = 2..message.len();
write!(&mut message, ",s={salt_base64},i={iterations}").unwrap();
let combined_nonce = combined_nonce_start..message.len();
// write salt and iterations
message.push_str(",s=");
message.push_str(salt_base64);
message.push_str(",i=");
message.push_str(itoa::Buffer::new().format(iterations));
// This design guarantees that it's impossible to create a
// server-first-message without receiving a client-first-message

View File

@@ -14,7 +14,7 @@ pub(crate) struct ServerSecret {
/// Number of iterations for `PBKDF2` function.
pub(crate) iterations: u32,
/// Salt used to hash user's password.
pub(crate) salt_base64: String,
pub(crate) salt_base64: Box<str>,
/// Hashed `ClientKey`.
pub(crate) stored_key: ScramKey,
/// Used by client to verify server's signature.
@@ -35,7 +35,7 @@ impl ServerSecret {
let secret = ServerSecret {
iterations: iterations.parse().ok()?,
salt_base64: salt.to_owned(),
salt_base64: salt.into(),
stored_key: base64_decode_array(stored_key)?.into(),
server_key: base64_decode_array(server_key)?.into(),
doomed: false,
@@ -58,7 +58,7 @@ impl ServerSecret {
// iteration count 1 for our generated passwords going forward.
// PG16 users can set iteration count=1 already today.
iterations: 1,
salt_base64: BASE64_STANDARD.encode(nonce),
salt_base64: BASE64_STANDARD.encode(nonce).into_boxed_str(),
stored_key: ScramKey::default(),
server_key: ScramKey::default(),
doomed: true,
@@ -88,7 +88,7 @@ mod tests {
let parsed = ServerSecret::parse(&secret).unwrap();
assert_eq!(parsed.iterations, iterations);
assert_eq!(parsed.salt_base64, salt);
assert_eq!(&*parsed.salt_base64, salt);
assert_eq!(BASE64_STANDARD.encode(parsed.stored_key), stored_key);
assert_eq!(BASE64_STANDARD.encode(parsed.server_key), server_key);

View File

@@ -137,7 +137,7 @@ impl Future for JobSpec {
let state = state.as_mut().expect("should be set on thread startup");
state.tick = state.tick.wrapping_add(1);
if state.tick % SKETCH_RESET_INTERVAL == 0 {
if state.tick.is_multiple_of(SKETCH_RESET_INTERVAL) {
state.countmin.reset();
}

View File

@@ -349,11 +349,11 @@ impl PoolingBackend {
debug!("setting up backend session state");
// initiates the auth session
if !disable_pg_session_jwt {
if let Err(e) = client.batch_execute("select auth.init();").await {
discard.discard();
return Err(e.into());
}
if !disable_pg_session_jwt
&& let Err(e) = client.batch_execute("select auth.init();").await
{
discard.discard();
return Err(e.into());
}
info!("backend session state initialized");

View File

@@ -148,11 +148,10 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
}
// remove from connection pool
if let Some(pool) = pool.clone().upgrade() {
if pool.write().remove_client(db_user.clone(), conn_id) {
if let Some(pool) = pool.clone().upgrade()
&& pool.write().remove_client(db_user.clone(), conn_id) {
info!("closed connection removed");
}
}
Poll::Ready(())
}).await;

View File

@@ -2,6 +2,8 @@ use std::collections::VecDeque;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Weak};
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use hyper::client::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use parking_lot::RwLock;
@@ -20,8 +22,6 @@ use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::protocol2::ConnectionInfoExtra;
use crate::types::EndpointCacheKey;
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
pub(crate) type Send = http2::SendRequest<BoxBody<Bytes, hyper::Error>>;
pub(crate) type Connect =
@@ -240,10 +240,10 @@ pub(crate) fn poll_http2_client(
}
// remove from connection pool
if let Some(pool) = pool.clone().upgrade() {
if pool.write().remove_conn(conn_id) {
info!("closed connection removed");
}
if let Some(pool) = pool.clone().upgrade()
&& pool.write().remove_conn(conn_id)
{
info!("closed connection removed");
}
}
.instrument(span),

View File

@@ -12,8 +12,7 @@ use serde::Serialize;
use url::Url;
use uuid::Uuid;
use super::conn_pool::AuthData;
use super::conn_pool::ConnInfoWithAuth;
use super::conn_pool::{AuthData, ConnInfoWithAuth};
use super::conn_pool_lib::ConnInfo;
use super::error::{ConnInfoError, Credentials};
use crate::auth::backend::ComputeUserInfo;

View File

@@ -249,11 +249,10 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
}
// remove from connection pool
if let Some(pool) = pool.clone().upgrade() {
if pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
if let Some(pool) = pool.clone().upgrade()
&& pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
info!("closed connection removed");
}
}
Poll::Ready(())
}).await;

View File

@@ -1,23 +1,25 @@
use std::pin::pin;
use std::sync::Arc;
use bytes::Bytes;
use futures::future::{Either, select, try_join};
use futures::{StreamExt, TryFutureExt};
use http::{Method, header::AUTHORIZATION};
use http_body_util::{BodyExt, Full, combinators::BoxBody};
use http::Method;
use http::header::AUTHORIZATION;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use http_utils::error::ApiError;
use hyper::body::Incoming;
use hyper::{
Request, Response, StatusCode, header,
http::{HeaderName, HeaderValue},
};
use hyper::http::{HeaderName, HeaderValue};
use hyper::{Request, Response, StatusCode, header};
use indexmap::IndexMap;
use postgres_client::error::{DbError, ErrorPosition, SqlState};
use postgres_client::{
GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, RowStream, Transaction,
};
use serde::Serialize;
use serde_json::{Value, value::RawValue};
use std::pin::pin;
use std::sync::Arc;
use serde_json::Value;
use serde_json::value::RawValue;
use tokio::time::{self, Instant};
use tokio_util::sync::CancellationToken;
use tracing::{Level, debug, error, info};
@@ -33,7 +35,6 @@ use super::http_util::{
};
use super::json::{JsonConversionError, json_to_pg_text, pg_text_row_to_json};
use crate::auth::backend::ComputeCredentialKeys;
use crate::config::{HttpConfig, ProxyConfig};
use crate::context::RequestContext;
use crate::error::{ErrorKind, ReportableError, UserFacingError};

View File

@@ -199,27 +199,27 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
let probe_msg;
let mut msg = &*msg;
if let Some(ctx) = ctx {
if ctx.get_testodrome_id().is_some() {
let tag = match error_kind {
ErrorKind::User => "client",
ErrorKind::ClientDisconnect => "client",
ErrorKind::RateLimit => "proxy",
ErrorKind::ServiceRateLimit => "proxy",
ErrorKind::Quota => "proxy",
ErrorKind::Service => "proxy",
ErrorKind::ControlPlane => "controlplane",
ErrorKind::Postgres => "other",
ErrorKind::Compute => "compute",
};
probe_msg = typed_json::json!({
"tag": tag,
"msg": msg,
"cold_start_info": ctx.cold_start_info(),
})
.to_string();
msg = &probe_msg;
}
if let Some(ctx) = ctx
&& ctx.get_testodrome_id().is_some()
{
let tag = match error_kind {
ErrorKind::User => "client",
ErrorKind::ClientDisconnect => "client",
ErrorKind::RateLimit => "proxy",
ErrorKind::ServiceRateLimit => "proxy",
ErrorKind::Quota => "proxy",
ErrorKind::Service => "proxy",
ErrorKind::ControlPlane => "controlplane",
ErrorKind::Postgres => "other",
ErrorKind::Compute => "compute",
};
probe_msg = typed_json::json!({
"tag": tag,
"msg": msg,
"cold_start_info": ctx.cold_start_info(),
})
.to_string();
msg = &probe_msg;
}
// TODO: either preserve the error code from postgres, or assign error codes to proxy errors.

View File

@@ -18,9 +18,10 @@ use metrics::set_build_info_metric;
use remote_storage::RemoteStorageConfig;
use safekeeper::defaults::{
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE,
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES, DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES,
DEFAULT_PARTIAL_BACKUP_CONCURRENCY, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_SSL_CERT_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
};
use safekeeper::wal_backup::WalBackup;
use safekeeper::{
@@ -138,6 +139,15 @@ struct Args {
/// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes
#[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)]
max_offloader_lag: u64,
/* BEGIN_HADRON */
/// Safekeeper will re-elect a new offloader if the current backup lagging for more than this value in bytes
#[arg(long, default_value_t = DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES)]
max_reelect_offloader_lag_bytes: u64,
/// Safekeeper will stop accepting new WALs if the timeline disk usage exceeds this value in bytes.
/// Setting this value to 0 disables the limit.
#[arg(long, default_value_t = DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES)]
max_timeline_disk_usage_bytes: u64,
/* END_HADRON */
/// Number of max parallel WAL segments to be offloaded to remote storage.
#[arg(long, default_value = "5")]
wal_backup_parallel_jobs: usize,
@@ -391,6 +401,10 @@ async fn main() -> anyhow::Result<()> {
peer_recovery_enabled: args.peer_recovery,
remote_storage: args.remote_storage,
max_offloader_lag_bytes: args.max_offloader_lag,
/* BEGIN_HADRON */
max_reelect_offloader_lag_bytes: args.max_reelect_offloader_lag_bytes,
max_timeline_disk_usage_bytes: args.max_timeline_disk_usage_bytes,
/* END_HADRON */
wal_backup_enabled: !args.disable_wal_backup,
backup_parallel_jobs: args.wal_backup_parallel_jobs,
pg_auth,

View File

@@ -17,6 +17,7 @@ use utils::crashsafe::durable_rename;
use crate::control_file_upgrade::{downgrade_v10_to_v9, upgrade_control_file};
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
use crate::metrics::WAL_DISK_IO_ERRORS;
use crate::state::{EvictionState, TimelinePersistentState};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
@@ -192,11 +193,14 @@ impl TimelinePersistentState {
impl Storage for FileStorage {
/// Persists state durably to the underlying storage.
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
// start timer for metrics
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
// write data to safekeeper.control.partial
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
/* BEGIN_HADRON */
WAL_DISK_IO_ERRORS.inc();
/*END_HADRON */
format!(
"failed to create partial control file at: {}",
&control_partial_path
@@ -206,14 +210,24 @@ impl Storage for FileStorage {
let buf: Vec<u8> = s.write_to_buf()?;
control_partial.write_all(&buf).await.with_context(|| {
/* BEGIN_HADRON */
WAL_DISK_IO_ERRORS.inc();
/*END_HADRON */
format!("failed to write safekeeper state into control file at: {control_partial_path}")
})?;
control_partial.flush().await.with_context(|| {
/* BEGIN_HADRON */
WAL_DISK_IO_ERRORS.inc();
/*END_HADRON */
format!("failed to flush safekeeper state into control file at: {control_partial_path}")
})?;
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
durable_rename(&control_partial_path, &control_path, !self.no_sync).await?;
durable_rename(&control_partial_path, &control_path, !self.no_sync)
.await
/* BEGIN_HADRON */
.inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
/* END_HADRON */
// update internal state
self.state = s.clone();

View File

@@ -61,6 +61,13 @@ pub mod defaults {
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
/* BEGIN_HADRON */
// Default leader re-elect is 0(disabled). SK will re-elect leader if the current leader is lagging this many bytes.
pub const DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES: u64 = 0;
// Default disk usage limit is 0 (disabled). It means each timeline by default can use up to this many WAL
// disk space on this SK until SK begins to reject WALs.
pub const DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES: u64 = 0;
/* END_HADRON */
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
@@ -99,6 +106,10 @@ pub struct SafeKeeperConf {
pub peer_recovery_enabled: bool,
pub remote_storage: Option<RemoteStorageConfig>,
pub max_offloader_lag_bytes: u64,
/* BEGIN_HADRON */
pub max_reelect_offloader_lag_bytes: u64,
pub max_timeline_disk_usage_bytes: u64,
/* END_HADRON */
pub backup_parallel_jobs: usize,
pub wal_backup_enabled: bool,
pub pg_auth: Option<Arc<JwtAuth>>,
@@ -151,6 +162,10 @@ impl SafeKeeperConf {
sk_auth_token: None,
heartbeat_timeout: Duration::new(5, 0),
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
/* BEGIN_HADRON */
max_reelect_offloader_lag_bytes: defaults::DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES,
max_timeline_disk_usage_bytes: defaults::DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES,
/* END_HADRON */
current_thread_runtime: false,
walsenders_keep_horizon: false,
partial_backup_timeout: Duration::from_secs(0),

View File

@@ -58,6 +58,25 @@ pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_flush_wal_seconds histogram")
});
/* BEGIN_HADRON */
pub static WAL_DISK_IO_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_wal_disk_io_errors",
"Number of disk I/O errors when creating and flushing WALs and control files"
)
.expect("Failed to register safekeeper_wal_disk_io_errors counter")
});
pub static WAL_STORAGE_LIMIT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_wal_storage_limit_errors",
concat!(
"Number of errors due to timeline WAL storage utilization exceeding configured limit. ",
"An increase in this metric indicates issues backing up or removing WALs."
)
)
.expect("Failed to register safekeeper_wal_storage_limit_errors counter")
});
/* END_HADRON */
pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_persist_control_file_seconds",
@@ -138,6 +157,15 @@ pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_backup_errors_total counter")
});
/* BEGIN_HADRON */
pub static BACKUP_REELECT_LEADER_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_backup_reelect_leader_total",
"Number of times the backup leader was reelected"
)
.expect("Failed to register safekeeper_backup_reelect_leader_total counter")
});
/* END_HADRON */
pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_push_update_seconds",

View File

@@ -16,7 +16,7 @@ use tokio::sync::mpsc::error::SendError;
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;
use tracing::{Instrument, error, info, info_span};
use utils::critical;
use utils::critical_timeline;
use utils::lsn::Lsn;
use utils::postgres_client::{Compression, InterpretedFormat};
use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
@@ -268,6 +268,8 @@ impl InterpretedWalReader {
let (shard_notification_tx, shard_notification_rx) = tokio::sync::mpsc::unbounded_channel();
let ttid = wal_stream.ttid;
let reader = InterpretedWalReader {
wal_stream,
shard_senders: HashMap::from([(
@@ -300,7 +302,11 @@ impl InterpretedWalReader {
.inspect_err(|err| match err {
// TODO: we may want to differentiate these errors further.
InterpretedWalReaderError::Decode(_) => {
critical!("failed to decode WAL record: {err:?}");
critical_timeline!(
ttid.tenant_id,
ttid.timeline_id,
"failed to read WAL record: {err:?}"
);
}
err => error!("failed to read WAL record: {err}"),
})
@@ -363,9 +369,14 @@ impl InterpretedWalReader {
metric.dec();
}
let ttid = self.wal_stream.ttid;
match self.run_impl(start_pos).await {
Err(err @ InterpretedWalReaderError::Decode(_)) => {
critical!("failed to decode WAL record: {err:?}");
critical_timeline!(
ttid.tenant_id,
ttid.timeline_id,
"failed to decode WAL record: {err:?}"
);
}
Err(err) => error!("failed to read WAL record: {err}"),
Ok(()) => info!("interpreted wal reader exiting"),

View File

@@ -26,7 +26,9 @@ use utils::id::{NodeId, TenantId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::sync::gate::Gate;
use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics};
use crate::metrics::{
FullTimelineInfo, MISC_OPERATION_SECONDS, WAL_STORAGE_LIMIT_ERRORS, WalStorageMetrics,
};
use crate::rate_limit::RateLimiter;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn};
@@ -1050,6 +1052,39 @@ impl WalResidentTimeline {
Ok(ss)
}
// BEGIN HADRON
// Check if disk usage by WAL segment files for this timeline exceeds the configured limit.
fn hadron_check_disk_usage(
&self,
shared_state_locked: &mut WriteGuardSharedState<'_>,
) -> Result<()> {
// The disk usage is calculated based on the number of segments between `last_removed_segno`
// and the current flush LSN segment number. `last_removed_segno` is advanced after
// unneeded WAL files are physically removed from disk (see `update_wal_removal_end()`
// in `timeline_manager.rs`).
let max_timeline_disk_usage_bytes = self.conf.max_timeline_disk_usage_bytes;
if max_timeline_disk_usage_bytes > 0 {
let last_removed_segno = self.last_removed_segno.load(Ordering::Relaxed);
let flush_lsn = shared_state_locked.sk.flush_lsn();
let wal_seg_size = shared_state_locked.sk.state().server.wal_seg_size as u64;
let current_segno = flush_lsn.segment_number(wal_seg_size as usize);
let segno_count = current_segno - last_removed_segno;
let disk_usage_bytes = segno_count * wal_seg_size;
if disk_usage_bytes > max_timeline_disk_usage_bytes {
WAL_STORAGE_LIMIT_ERRORS.inc();
bail!(
"WAL storage utilization exceeds configured limit of {} bytes: current disk usage: {} bytes",
max_timeline_disk_usage_bytes,
disk_usage_bytes
);
}
}
Ok(())
}
// END HADRON
/// Pass arrived message to the safekeeper.
pub async fn process_msg(
&self,
@@ -1062,6 +1097,13 @@ impl WalResidentTimeline {
let mut rmsg: Option<AcceptorProposerMessage>;
{
let mut shared_state = self.write_shared_state().await;
// BEGIN HADRON
// Errors from the `hadron_check_disk_usage()` function fail the process_msg() function, which
// gets propagated upward and terminates the entire WalAcceptor. This will cause postgres to
// disconnect from the safekeeper and reestablish another connection. Postgres will keep retrying
// safekeeper connections every second until it can successfully propose WAL to the SK again.
self.hadron_check_disk_usage(&mut shared_state)?;
// END HADRON
rmsg = shared_state.sk.safekeeper().process_msg(msg).await?;
// if this is AppendResponse, fill in proper hot standby feedback.

View File

@@ -26,7 +26,9 @@ use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::{backoff, pausable_failpoint};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
use crate::metrics::{
BACKED_UP_SEGMENTS, BACKUP_ERRORS, BACKUP_REELECT_LEADER_COUNT, WAL_BACKUP_TASKS,
};
use crate::timeline::WalResidentTimeline;
use crate::timeline_manager::{Manager, StateSnapshot};
use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME};
@@ -70,8 +72,9 @@ pub(crate) async fn update_task(
need_backup: bool,
state: &StateSnapshot,
) {
let (offloader, election_dbg_str) =
determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
/* BEGIN_HADRON */
let (offloader, election_dbg_str) = hadron_determine_offloader(mgr, state);
/* END_HADRON */
let elected_me = Some(mgr.conf.my_id) == offloader;
let should_task_run = need_backup && elected_me;
@@ -127,6 +130,70 @@ async fn shut_down_task(entry: &mut Option<WalBackupTaskHandle>) {
}
}
/* BEGIN_HADRON */
// On top of the neon determine_offloader, we also check if the current offloader is lagging behind too much.
// If it is, we re-elect a new offloader. This mitigates the below issue. It also helps distribute the load across SKs.
//
// We observe that the offloader fails to upload a segment due to race conditions on XLOG SWITCH and PG start streaming WALs.
// wal_backup task continously failing to upload a full segment while the segment remains partial on the disk.
// The consequence is that commit_lsn for all SKs move forward but backup_lsn stays the same. Then, all SKs run out of disk space.
// See go/sk-ood-xlog-switch for more details.
//
// To mitigate this issue, we will re-elect a new offloader if the current offloader is lagging behind too much.
// Each SK makes the decision locally but they are aware of each other's commit and backup lsns.
//
// determine_offloader will pick a SK. say SK-1.
// Each SK checks
// -- if commit_lsn - back_lsn > threshold,
// -- -- remove SK-1 from the candidate and call determine_offloader again.
// SK-1 will step down and all SKs will elect the same leader again.
// After the backup is caught up, the leader will become SK-1 again.
fn hadron_determine_offloader(mgr: &Manager, state: &StateSnapshot) -> (Option<NodeId>, String) {
let mut offloader: Option<NodeId>;
let mut election_dbg_str: String;
let caughtup_peers_count: usize;
(offloader, election_dbg_str, caughtup_peers_count) =
determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
if offloader.is_none()
|| caughtup_peers_count <= 1
|| mgr.conf.max_reelect_offloader_lag_bytes == 0
{
return (offloader, election_dbg_str);
}
let offloader_sk_id = offloader.unwrap();
let backup_lag = state.commit_lsn.checked_sub(state.backup_lsn);
if backup_lag.is_none() {
info!("Backup lag is None. Skipping re-election.");
return (offloader, election_dbg_str);
}
let backup_lag = backup_lag.unwrap().0;
if backup_lag < mgr.conf.max_reelect_offloader_lag_bytes {
return (offloader, election_dbg_str);
}
info!(
"Electing a new leader: Backup lag is too high backup lsn lag {} threshold {}: {}",
backup_lag, mgr.conf.max_reelect_offloader_lag_bytes, election_dbg_str
);
BACKUP_REELECT_LEADER_COUNT.inc();
// Remove the current offloader if lag is too high.
let new_peers: Vec<_> = state
.peers
.iter()
.filter(|p| p.sk_id != offloader_sk_id)
.cloned()
.collect();
(offloader, election_dbg_str, _) =
determine_offloader(&new_peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
(offloader, election_dbg_str)
}
/* END_HADRON */
/// The goal is to ensure that normally only one safekeepers offloads. However,
/// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
/// time we have several ones as they PUT the same files. Also,
@@ -141,13 +208,13 @@ fn determine_offloader(
wal_backup_lsn: Lsn,
ttid: TenantTimelineId,
conf: &SafeKeeperConf,
) -> (Option<NodeId>, String) {
) -> (Option<NodeId>, String, usize) {
// TODO: remove this once we fill newly joined safekeepers since backup_lsn.
let capable_peers = alive_peers
.iter()
.filter(|p| p.local_start_lsn <= wal_backup_lsn);
match capable_peers.clone().map(|p| p.commit_lsn).max() {
None => (None, "no connected peers to elect from".to_string()),
None => (None, "no connected peers to elect from".to_string(), 0),
Some(max_commit_lsn) => {
let threshold = max_commit_lsn
.checked_sub(conf.max_offloader_lag_bytes)
@@ -175,6 +242,7 @@ fn determine_offloader(
capable_peers_dbg,
caughtup_peers.len()
),
caughtup_peers.len(),
)
}
}
@@ -346,6 +414,8 @@ async fn backup_lsn_range(
anyhow::bail!("parallel_jobs must be >= 1");
}
pausable_failpoint!("backup-lsn-range-pausable");
let remote_timeline_path = &timeline.remote_path;
let start_lsn = *backup_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);

View File

@@ -1,15 +1,15 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use safekeeper_api::Term;
use utils::lsn::Lsn;
use crate::send_wal::EndWatch;
use crate::timeline::WalResidentTimeline;
use crate::wal_storage::WalReader;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use safekeeper_api::Term;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
#[derive(PartialEq, Eq, Debug)]
pub(crate) struct WalBytes {
@@ -37,6 +37,8 @@ struct PositionedWalReader {
pub(crate) struct StreamingWalReader {
stream: BoxStream<'static, WalOrReset>,
start_changed_tx: tokio::sync::watch::Sender<Lsn>,
// HADRON: Added TenantTimelineId for instrumentation purposes.
pub(crate) ttid: TenantTimelineId,
}
pub(crate) enum WalOrReset {
@@ -63,6 +65,7 @@ impl StreamingWalReader {
buffer_size: usize,
) -> Self {
let (start_changed_tx, start_changed_rx) = tokio::sync::watch::channel(start);
let ttid = tli.ttid;
let state = WalReaderStreamState {
tli,
@@ -107,6 +110,7 @@ impl StreamingWalReader {
Self {
stream,
start_changed_tx,
ttid,
}
}

View File

@@ -31,7 +31,8 @@ use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use crate::metrics::{
REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics, time_io_closure,
REMOVED_WAL_SEGMENTS, WAL_DISK_IO_ERRORS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics,
time_io_closure,
};
use crate::state::TimelinePersistentState;
use crate::wal_backup::{WalBackup, read_object, remote_timeline_path};
@@ -293,9 +294,12 @@ impl PhysicalStorage {
// half initialized segment, first bake it under tmp filename and
// then rename.
let tmp_path = self.timeline_dir.join("waltmp");
let file = File::create(&tmp_path)
.await
.with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;
let file: File = File::create(&tmp_path).await.with_context(|| {
/* BEGIN_HADRON */
WAL_DISK_IO_ERRORS.inc();
/* END_HADRON */
format!("Failed to open tmp wal file {:?}", &tmp_path)
})?;
fail::fail_point!("sk-zero-segment", |_| {
info!("sk-zero-segment failpoint hit");
@@ -382,7 +386,11 @@ impl PhysicalStorage {
let flushed = self
.write_in_segment(segno, xlogoff, &buf[..bytes_write])
.await?;
.await
/* BEGIN_HADRON */
.inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
/* END_HADRON */
self.write_lsn += bytes_write as u64;
if flushed {
self.flush_lsn = self.write_lsn;
@@ -491,7 +499,11 @@ impl Storage for PhysicalStorage {
}
if let Some(unflushed_file) = self.file.take() {
self.fdatasync_file(&unflushed_file).await?;
self.fdatasync_file(&unflushed_file)
.await
/* BEGIN_HADRON */
.inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
/* END_HADRON */
self.file = Some(unflushed_file);
} else {
// We have unflushed data (write_lsn != flush_lsn), but no file. This

View File

@@ -159,6 +159,10 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
heartbeat_timeout: Duration::from_secs(0),
remote_storage: None,
max_offloader_lag_bytes: 0,
/* BEGIN_HADRON */
max_reelect_offloader_lag_bytes: 0,
max_timeline_disk_usage_bytes: 0,
/* END_HADRON */
wal_backup_enabled: false,
listen_pg_addr_tenant_only: None,
advertise_pg_addr: None,

View File

@@ -0,0 +1 @@
ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'pause';

View File

@@ -0,0 +1 @@
ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'activating';

View File

@@ -76,6 +76,9 @@ pub(crate) struct StorageControllerMetricGroup {
/// How many shards would like to reconcile but were blocked by concurrency limits
pub(crate) storage_controller_pending_reconciles: measured::Gauge,
/// How many shards are keep-failing and will be ignored when considering to run optimizations
pub(crate) storage_controller_keep_failing_reconciles: measured::Gauge,
/// HTTP request status counters for handled requests
pub(crate) storage_controller_http_request_status:
measured::CounterVec<HttpRequestStatusLabelGroupSet>,

View File

@@ -1388,6 +1388,48 @@ impl Persistence {
.await
}
/// Activate the given safekeeper, ensuring that there is no TOCTOU.
/// Returns `Some` if the safekeeper has indeed been activating (or already active). Other states return `None`.
pub(crate) async fn activate_safekeeper(&self, id_: i64) -> Result<Option<()>, DatabaseError> {
use crate::schema::safekeepers::dsl::*;
self.with_conn(move |conn| {
Box::pin(async move {
#[derive(Insertable, AsChangeset)]
#[diesel(table_name = crate::schema::safekeepers)]
struct UpdateSkSchedulingPolicy<'a> {
id: i64,
scheduling_policy: &'a str,
}
let scheduling_policy_active = String::from(SkSchedulingPolicy::Active);
let scheduling_policy_activating = String::from(SkSchedulingPolicy::Activating);
let rows_affected = diesel::update(
safekeepers.filter(id.eq(id_)).filter(
scheduling_policy
.eq(scheduling_policy_activating)
.or(scheduling_policy.eq(&scheduling_policy_active)),
),
)
.set(scheduling_policy.eq(&scheduling_policy_active))
.execute(conn)
.await?;
if rows_affected == 0 {
return Ok(Some(()));
}
if rows_affected != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({rows_affected})",
)));
}
Ok(Some(()))
})
})
.await
}
/// Persist timeline. Returns if the timeline was newly inserted. If it wasn't, we haven't done any writes.
pub(crate) async fn insert_timeline(&self, entry: TimelinePersistence) -> DatabaseResult<bool> {
use crate::schema::timelines;

View File

@@ -31,8 +31,8 @@ use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
SkSchedulingPolicy, TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use pageserver_api::models::{
@@ -210,6 +210,10 @@ pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256;
pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32;
// Number of consecutive reconciliation errors, occured for one shard,
// after which the shard is ignored when considering to run optimizations.
const MAX_CONSECUTIVE_RECONCILIATION_ERRORS: usize = 5;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
// This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly
// than they're being pushed onto the queue.
@@ -702,6 +706,36 @@ struct ShardMutationLocations {
#[derive(Default, Clone)]
struct TenantMutationLocations(BTreeMap<TenantShardId, ShardMutationLocations>);
struct ReconcileAllResult {
spawned_reconciles: usize,
keep_failing_reconciles: usize,
has_delayed_reconciles: bool,
}
impl ReconcileAllResult {
fn new(
spawned_reconciles: usize,
keep_failing_reconciles: usize,
has_delayed_reconciles: bool,
) -> Self {
assert!(
spawned_reconciles >= keep_failing_reconciles,
"It is impossible to have more keep-failing reconciles than spawned reconciles"
);
Self {
spawned_reconciles,
keep_failing_reconciles,
has_delayed_reconciles,
}
}
/// We can run optimizations only if we don't have any delayed reconciles and
/// all spawned reconciles are also keep-failing reconciles.
fn can_run_optimizations(&self) -> bool {
!self.has_delayed_reconciles && self.spawned_reconciles == self.keep_failing_reconciles
}
}
impl Service {
pub fn get_config(&self) -> &Config {
&self.config
@@ -899,7 +933,7 @@ impl Service {
// which require it: under normal circumstances this should only include tenants that were in some
// transient state before we restarted, or any tenants whose compute hooks failed above.
tracing::info!("Checking for shards in need of reconciliation...");
let reconcile_tasks = self.reconcile_all();
let reconcile_all_result = self.reconcile_all();
// We will not wait for these reconciliation tasks to run here: we're now done with startup and
// normal operations may proceed.
@@ -947,8 +981,9 @@ impl Service {
}
}
let spawned_reconciles = reconcile_all_result.spawned_reconciles;
tracing::info!(
"Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"
"Startup complete, spawned {spawned_reconciles} reconciliation tasks ({shard_count} shards total)"
);
}
@@ -1199,8 +1234,8 @@ impl Service {
while !self.reconcilers_cancel.is_cancelled() {
tokio::select! {
_ = interval.tick() => {
let reconciles_spawned = self.reconcile_all();
if reconciles_spawned == 0 {
let reconcile_all_result = self.reconcile_all();
if reconcile_all_result.can_run_optimizations() {
// Run optimizer only when we didn't find any other work to do
self.optimize_all().await;
}
@@ -1214,7 +1249,7 @@ impl Service {
}
/// Heartbeat all storage nodes once in a while.
#[instrument(skip_all)]
async fn spawn_heartbeat_driver(&self) {
async fn spawn_heartbeat_driver(self: &Arc<Self>) {
self.startup_complete.clone().wait().await;
let mut interval = tokio::time::interval(self.config.heartbeat_interval);
@@ -1341,18 +1376,51 @@ impl Service {
}
}
if let Ok(deltas) = res_sk {
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
for (id, state) in deltas.0 {
let Some(sk) = safekeepers.get_mut(&id) else {
tracing::info!(
"Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}"
);
continue;
};
sk.set_availability(state);
let mut to_activate = Vec::new();
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
for (id, state) in deltas.0 {
let Some(sk) = safekeepers.get_mut(&id) else {
tracing::info!(
"Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}"
);
continue;
};
if sk.scheduling_policy() == SkSchedulingPolicy::Activating
&& let SafekeeperState::Available { .. } = state
{
to_activate.push(id);
}
sk.set_availability(state);
}
locked.safekeepers = Arc::new(safekeepers);
}
for sk_id in to_activate {
// TODO this can race with set_scheduling_policy (can create disjoint DB <-> in-memory state)
tracing::info!("Activating safekeeper {sk_id}");
match self.persistence.activate_safekeeper(sk_id.0 as i64).await {
Ok(Some(())) => {}
Ok(None) => {
tracing::info!(
"safekeeper {sk_id} has been removed from db or has different scheduling policy than active or activating"
);
}
Err(e) => {
tracing::warn!("couldn't apply activation of {sk_id} to db: {e}");
continue;
}
}
if let Err(e) = self
.set_safekeeper_scheduling_policy_in_mem(sk_id, SkSchedulingPolicy::Active)
.await
{
tracing::info!("couldn't activate safekeeper {sk_id} in memory: {e}");
continue;
}
tracing::info!("Activation of safekeeper {sk_id} done");
}
locked.safekeepers = Arc::new(safekeepers);
}
}
}
@@ -1408,6 +1476,7 @@ impl Service {
match result.result {
Ok(()) => {
tenant.consecutive_errors_count = 0;
tenant.apply_observed_deltas(deltas);
tenant.waiter.advance(result.sequence);
}
@@ -1426,6 +1495,8 @@ impl Service {
}
}
tenant.consecutive_errors_count = tenant.consecutive_errors_count.saturating_add(1);
// Ordering: populate last_error before advancing error_seq,
// so that waiters will see the correct error after waiting.
tenant.set_last_error(result.sequence, e);
@@ -8026,7 +8097,7 @@ impl Service {
/// Returns how many reconciliation tasks were started, or `1` if no reconciles were
/// spawned but some _would_ have been spawned if `reconciler_concurrency` units where
/// available. A return value of 0 indicates that everything is fully reconciled already.
fn reconcile_all(&self) -> usize {
fn reconcile_all(&self) -> ReconcileAllResult {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let pageservers = nodes.clone();
@@ -8034,13 +8105,16 @@ impl Service {
// This function is an efficient place to update lazy statistics, since we are walking
// all tenants.
let mut pending_reconciles = 0;
let mut keep_failing_reconciles = 0;
let mut az_violations = 0;
// If we find any tenants to drop from memory, stash them to offload after
// we're done traversing the map of tenants.
let mut drop_detached_tenants = Vec::new();
let mut reconciles_spawned = 0;
let mut spawned_reconciles = 0;
let mut has_delayed_reconciles = false;
for shard in tenants.values_mut() {
// Accumulate scheduling statistics
if let (Some(attached), Some(preferred)) =
@@ -8060,18 +8134,32 @@ impl Service {
// If there is something delayed, then return a nonzero count so that
// callers like reconcile_all_now do not incorrectly get the impression
// that the system is in a quiescent state.
reconciles_spawned = std::cmp::max(1, reconciles_spawned);
has_delayed_reconciles = true;
pending_reconciles += 1;
continue;
}
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
// dirty, spawn another one
let consecutive_errors_count = shard.consecutive_errors_count;
if self
.maybe_reconcile_shard(shard, &pageservers, ReconcilerPriority::Normal)
.is_some()
{
reconciles_spawned += 1;
spawned_reconciles += 1;
// Count shards that are keep-failing. We still want to reconcile them
// to avoid a situation where a shard is stuck.
// But we don't want to consider them when deciding to run optimizations.
if consecutive_errors_count >= MAX_CONSECUTIVE_RECONCILIATION_ERRORS {
tracing::warn!(
tenant_id=%shard.tenant_shard_id.tenant_id,
shard_id=%shard.tenant_shard_id.shard_slug(),
"Shard reconciliation is keep-failing: {} errors",
consecutive_errors_count
);
keep_failing_reconciles += 1;
}
} else if shard.delayed_reconcile {
// Shard wanted to reconcile but for some reason couldn't.
pending_reconciles += 1;
@@ -8110,7 +8198,16 @@ impl Service {
.storage_controller_pending_reconciles
.set(pending_reconciles as i64);
reconciles_spawned
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_keep_failing_reconciles
.set(keep_failing_reconciles as i64);
ReconcileAllResult::new(
spawned_reconciles,
keep_failing_reconciles,
has_delayed_reconciles,
)
}
/// `optimize` in this context means identifying shards which have valid scheduled locations, but
@@ -8783,13 +8880,13 @@ impl Service {
/// also wait for any generated Reconcilers to complete. Calling this until it returns zero should
/// put the system into a quiescent state where future background reconciliations won't do anything.
pub(crate) async fn reconcile_all_now(&self) -> Result<usize, ReconcileWaitError> {
let reconciles_spawned = self.reconcile_all();
let reconciles_spawned = if reconciles_spawned == 0 {
let reconcile_all_result = self.reconcile_all();
let mut spawned_reconciles = reconcile_all_result.spawned_reconciles;
if reconcile_all_result.can_run_optimizations() {
// Only optimize when we are otherwise idle
self.optimize_all().await
} else {
reconciles_spawned
};
let optimization_reconciles = self.optimize_all().await;
spawned_reconciles += optimization_reconciles;
}
let waiters = {
let mut waiters = Vec::new();
@@ -8826,11 +8923,11 @@ impl Service {
tracing::info!(
"{} reconciles in reconcile_all, {} waiters",
reconciles_spawned,
spawned_reconciles,
waiter_count
);
Ok(std::cmp::max(waiter_count, reconciles_spawned))
Ok(std::cmp::max(waiter_count, spawned_reconciles))
}
async fn stop_reconciliations(&self, reason: StopReconciliationsReason) {

View File

@@ -236,40 +236,30 @@ impl Service {
F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
T: Sync + Send + 'static,
{
let target_sk_count = safekeepers.len();
if target_sk_count == 0 {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"timeline configured without any safekeepers"
)));
}
if target_sk_count < self.config.timeline_safekeeper_count {
tracing::warn!(
"running a quorum operation with {} safekeepers, which is less than configured {} safekeepers per timeline",
target_sk_count,
self.config.timeline_safekeeper_count
);
}
let results = self
.tenant_timeline_safekeeper_op(safekeepers, op, timeout)
.await?;
// Now check if quorum was reached in results.
let target_sk_count = safekeepers.len();
let quorum_size = match target_sk_count {
0 => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"timeline configured without any safekeepers",
)));
}
1 | 2 => {
#[cfg(feature = "testing")]
{
// In test settings, it is allowed to have one or two safekeepers
target_sk_count
}
#[cfg(not(feature = "testing"))]
{
// The region is misconfigured: we need at least three safekeepers to be configured
// in order to schedule work to them
tracing::warn!(
"couldn't find at least 3 safekeepers for timeline, found: {:?}",
target_sk_count
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find at least 3 safekeepers to put timeline to"
)));
}
}
_ => target_sk_count / 2 + 1,
};
let quorum_size = target_sk_count / 2 + 1;
let success_count = results.iter().filter(|res| res.is_ok()).count();
if success_count < quorum_size {
// Failure
@@ -815,7 +805,7 @@ impl Service {
Safekeeper::from_persistence(
crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::Pause,
SkSchedulingPolicy::Activating,
),
CancellationToken::new(),
use_https,
@@ -856,27 +846,36 @@ impl Service {
.await?;
let node_id = NodeId(id as u64);
// After the change has been persisted successfully, update the in-memory state
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
let sk = safekeepers
.get_mut(&node_id)
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
sk.set_scheduling_policy(scheduling_policy);
self.set_safekeeper_scheduling_policy_in_mem(node_id, scheduling_policy)
.await
}
match scheduling_policy {
SkSchedulingPolicy::Active => {
locked
.safekeeper_reconcilers
.start_reconciler(node_id, self);
}
SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
locked.safekeeper_reconcilers.stop_reconciler(node_id);
}
pub(crate) async fn set_safekeeper_scheduling_policy_in_mem(
self: &Arc<Service>,
node_id: NodeId,
scheduling_policy: SkSchedulingPolicy,
) -> Result<(), DatabaseError> {
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
let sk = safekeepers
.get_mut(&node_id)
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
sk.set_scheduling_policy(scheduling_policy);
match scheduling_policy {
SkSchedulingPolicy::Active => {
locked
.safekeeper_reconcilers
.start_reconciler(node_id, self);
}
SkSchedulingPolicy::Decomissioned
| SkSchedulingPolicy::Pause
| SkSchedulingPolicy::Activating => {
locked.safekeeper_reconcilers.stop_reconciler(node_id);
}
locked.safekeepers = Arc::new(safekeepers);
}
locked.safekeepers = Arc::new(safekeepers);
Ok(())
}

View File

@@ -131,6 +131,15 @@ pub(crate) struct TenantShard {
#[serde(serialize_with = "read_last_error")]
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
/// Number of consecutive reconciliation errors that have occurred for this shard.
///
/// When this count reaches MAX_CONSECUTIVE_RECONCILIATION_ERRORS, the tenant shard
/// will be countered as keep-failing in `reconcile_all` calculations. This will lead to
/// allowing optimizations to run even with some failing shards.
///
/// The counter is reset to 0 after a successful reconciliation.
pub(crate) consecutive_errors_count: usize,
/// If we have a pending compute notification that for some reason we weren't able to send,
/// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
/// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
@@ -594,6 +603,7 @@ impl TenantShard {
waiter: Arc::new(SeqWait::new(Sequence(0))),
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
last_error: Arc::default(),
consecutive_errors_count: 0,
pending_compute_notification: false,
scheduling_policy: ShardSchedulingPolicy::default(),
preferred_node: None,
@@ -1859,6 +1869,7 @@ impl TenantShard {
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),
consecutive_errors_count: 0,
pending_compute_notification: false,
delayed_reconcile: false,
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),

View File

@@ -989,6 +989,102 @@ def test_storage_controller_compute_hook_retry(
)
@run_only_on_default_postgres("postgres behavior is not relevant")
def test_storage_controller_compute_hook_keep_failing(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address: ListenAddress,
):
neon_env_builder.num_pageservers = 4
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_hooks_api = f"http://{host}:{port}"
# Set up CP handler for compute notifications
status_by_tenant: dict[TenantId, int] = {}
def handler(request: Request):
notify_request = request.json
assert notify_request is not None
status = status_by_tenant[TenantId(notify_request["tenant_id"])]
log.info(f"Notify request[{status}]: {notify_request}")
return Response(status=status)
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
# Run neon environment
env = neon_env_builder.init_configs()
env.start()
# Create two tenants:
# - The first tenant is banned by CP and contains only one shard
# - The second tenant is allowed by CP and contains four shards
banned_tenant = TenantId.generate()
status_by_tenant[banned_tenant] = 200 # we will ban this tenant later
env.create_tenant(banned_tenant, placement_policy='{"Attached": 1}')
shard_count = 4
allowed_tenant = TenantId.generate()
status_by_tenant[allowed_tenant] = 200
env.create_tenant(allowed_tenant, shard_count=shard_count, placement_policy='{"Attached": 1}')
# Find the pageserver of the banned tenant
banned_tenant_ps = env.get_tenant_pageserver(banned_tenant)
assert banned_tenant_ps is not None
alive_pageservers = [p for p in env.pageservers if p.id != banned_tenant_ps.id]
# Stop pageserver and ban tenant to trigger failed reconciliation
status_by_tenant[banned_tenant] = 423
banned_tenant_ps.stop()
env.storage_controller.allowed_errors.append(NOTIFY_BLOCKED_LOG)
env.storage_controller.allowed_errors.extend(NOTIFY_FAILURE_LOGS)
env.storage_controller.allowed_errors.append(".*Shard reconciliation is keep-failing.*")
env.storage_controller.node_configure(banned_tenant_ps.id, {"availability": "Offline"})
# Migrate all allowed tenant shards to the first alive pageserver
# to trigger storage controller optimizations due to affinity rules
for shard_number in range(shard_count):
env.storage_controller.tenant_shard_migrate(
TenantShardId(allowed_tenant, shard_number, shard_count),
alive_pageservers[0].id,
config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True),
)
# Make some reconcile_all calls to trigger optimizations
# RECONCILE_COUNT must be greater than storcon's MAX_CONSECUTIVE_RECONCILIATION_ERRORS
RECONCILE_COUNT = 12
for i in range(RECONCILE_COUNT):
try:
n = env.storage_controller.reconcile_all()
log.info(f"Reconciliation attempt {i} finished with success: {n}")
except StorageControllerApiException as e:
assert "Control plane tenant busy" in str(e)
log.info(f"Reconciliation attempt {i} finished with failure")
banned_descr = env.storage_controller.tenant_describe(banned_tenant)
assert banned_descr["shards"][0]["is_pending_compute_notification"] is True
time.sleep(2)
# Check that the allowed tenant shards are optimized due to affinity rules
locations = alive_pageservers[0].http_client().tenant_list_locations()["tenant_shards"]
not_optimized_shard_count = 0
for loc in locations:
tsi = TenantShardId.parse(loc[0])
if tsi.tenant_id != allowed_tenant:
continue
if loc[1]["mode"] == "AttachedSingle":
not_optimized_shard_count += 1
log.info(f"Shard {tsi} seen in mode {loc[1]['mode']}")
assert not_optimized_shard_count < shard_count, "At least one shard should be optimized"
# Unban the tenant and run reconciliations
status_by_tenant[banned_tenant] = 200
env.storage_controller.reconcile_all()
banned_descr = env.storage_controller.tenant_describe(banned_tenant)
assert banned_descr["shards"][0]["is_pending_compute_notification"] is False
@run_only_on_default_postgres("this test doesn't start an endpoint")
def test_storage_controller_compute_hook_revert(
httpserver: HTTPServer,
@@ -3530,18 +3626,21 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
# some small tests for the scheduling policy querying and returning APIs
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Pause"
target.safekeeper_scheduling_policy(inserted["id"], "Active")
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Active"
# Ensure idempotency
target.safekeeper_scheduling_policy(inserted["id"], "Active")
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Active"
# change back to paused again
assert (
newest_info["scheduling_policy"] == "Activating"
or newest_info["scheduling_policy"] == "Active"
)
target.safekeeper_scheduling_policy(inserted["id"], "Pause")
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Pause"
# Ensure idempotency
target.safekeeper_scheduling_policy(inserted["id"], "Pause")
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Pause"
# change back to active again
target.safekeeper_scheduling_policy(inserted["id"], "Active")
def storcon_heartbeat():
assert env.storage_controller.log_contains(
@@ -3554,6 +3653,57 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_safekeeper_activating_to_active(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_configs()
env.start()
fake_id = 5
target = env.storage_controller
assert target.get_safekeeper(fake_id) is None
start_sks = target.get_safekeepers()
sk_0 = env.safekeepers[0]
body = {
"active": True,
"id": fake_id,
"created_at": "2023-10-25T09:11:25Z",
"updated_at": "2024-08-28T11:32:43Z",
"region_id": "aws-eu-central-1",
"host": "localhost",
"port": sk_0.port.pg,
"http_port": sk_0.port.http,
"https_port": None,
"version": 5957,
"availability_zone_id": "eu-central-1a",
}
target.on_safekeeper_deploy(fake_id, body)
inserted = target.get_safekeeper(fake_id)
assert inserted is not None
assert target.get_safekeepers() == start_sks + [inserted]
assert eq_safekeeper_records(body, inserted)
def safekeeper_is_active():
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Active"
wait_until(safekeeper_is_active)
target.safekeeper_scheduling_policy(inserted["id"], "Activating")
wait_until(safekeeper_is_active)
# Now decomission it
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
compared = [dict(a), dict(b)]

View File

@@ -1889,6 +1889,31 @@ def test_timeline_detach_with_aux_files_with_detach_v1(
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
def test_detach_ancestors_with_no_writes(
neon_env_builder: NeonEnvBuilder,
):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main", tenant_id=env.initial_tenant)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
endpoint.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_parent_1', 'pgoutput')"
)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
endpoint.stop()
for i in range(0, 5):
if i == 0:
ancestor_name = "main"
else:
ancestor_name = f"b{i}"
tlid = env.create_branch(f"b{i + 1}", ancestor_branch_name=ancestor_name)
client = env.pageserver.http_client()
client.detach_ancestor(tenant_id=env.initial_tenant, timeline_id=tlid)
# TODO:
# - branch near existing L1 boundary, image layers?
# - investigate: why are layers started at uneven lsn? not just after branching, but in general.

View File

@@ -2740,3 +2740,85 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde
raise Exception("Uneviction did not happen on source safekeeper yet")
wait_until(unevicted)
def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
"""
Test that the timeline disk usage circuit breaker works as expected. We test that:
1. The circuit breaker kicks in when the timeline's disk usage exceeds the configured limit,
and it causes writes to hang.
2. The hanging writes unblock when the issue resolves (by restarting the safekeeper in the
test to simulate a more realistic production troubleshooting scenario).
3. We can continue to write as normal after the issue resolves.
4. There is no data corruption throughout the test.
"""
# Set up environment with a very small disk usage limit (1KB)
neon_env_builder.num_safekeepers = 1
remote_storage_kind = s3_storage()
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
# Set a very small disk usage limit (1KB)
neon_env_builder.safekeeper_extra_opts = ["--max-timeline-disk-usage-bytes=1024"]
env = neon_env_builder.init_start()
# Create a timeline and endpoint
env.create_branch("test_timeline_disk_usage_limit")
endpoint = env.endpoints.create_start("test_timeline_disk_usage_limit")
# Get the safekeeper
sk = env.safekeepers[0]
# Inject a failpoint to stop WAL backup
with sk.http_client() as http_cli:
http_cli.configure_failpoints([("backup-lsn-range-pausable", "pause")])
# Write some data that will exceed the 1KB limit. While the failpoint is active, this operation
# will hang as Postgres encounters safekeeper-returned errors and retries.
def run_hanging_insert():
with closing(endpoint.connect()) as bg_conn:
with bg_conn.cursor() as bg_cur:
# This should generate more than 1KB of WAL
bg_cur.execute("create table t(key int, value text)")
bg_cur.execute("insert into t select generate_series(1,2000), 'payload'")
# Start the inserts in a background thread
bg_thread = threading.Thread(target=run_hanging_insert)
bg_thread.start()
# Wait for the error message to appear in the compute log
def error_logged():
return endpoint.log_contains("WAL storage utilization exceeds configured limit") is not None
wait_until(error_logged)
log.info("Found expected error message in compute log, resuming.")
# Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we
# implemented didn't work as expected.
time.sleep(2)
assert bg_thread.is_alive(), (
"The hanging insert somehow unblocked without resolving the disk usage issue!"
)
log.info("Restarting the safekeeper to resume WAL backup.")
# Restart the safekeeper with defaults to both clear the failpoint and resume the larger disk usage limit.
for sk in env.safekeepers:
sk.stop().start(extra_opts=[])
# The hanging insert will now complete. Join the background thread so that we can
# verify that the insert completed successfully.
bg_thread.join(timeout=120)
assert not bg_thread.is_alive(), "Hanging insert did not complete after safekeeper restart"
log.info("Hanging insert unblocked.")
# Verify we can continue to write as normal
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("insert into t select generate_series(2001,3000), 'payload'")
# Sanity check data correctness
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("select count(*) from t")
# 2000 rows from first insert + 1000 from last insert
assert cur.fetchone() == (3000,)