mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
Compare commits
42 Commits
vlad/asdas
...
max_vacuum
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9fb17dba04 | ||
|
|
adde0ecfe0 | ||
|
|
19accfee4e | ||
|
|
e579bc0819 | ||
|
|
c9e6dd45d3 | ||
|
|
bf9fc77061 | ||
|
|
a004d27fca | ||
|
|
a46253766b | ||
|
|
5b69b32dc5 | ||
|
|
e03c3c9893 | ||
|
|
bbb2fa7cdd | ||
|
|
778787d8e9 | ||
|
|
90b51dcf16 | ||
|
|
a85aa03d18 | ||
|
|
cdaed4d79c | ||
|
|
ea0b22a9b0 | ||
|
|
392a58bdce | ||
|
|
e0891ec8c8 | ||
|
|
97f7188a07 | ||
|
|
aae3876318 | ||
|
|
dae55badf3 | ||
|
|
4273309962 | ||
|
|
4a0c2aebe0 | ||
|
|
891cb5a9a8 | ||
|
|
f5832329ac | ||
|
|
6216df7765 | ||
|
|
5de896e7d8 | ||
|
|
25eefdeb1f | ||
|
|
28929d9cfa | ||
|
|
9b4b4bbf6f | ||
|
|
1a0f545c16 | ||
|
|
7dcdbaa25e | ||
|
|
0497b99f3a | ||
|
|
9882ac8e06 | ||
|
|
0789160ffa | ||
|
|
9c32604aa9 | ||
|
|
b02aafdfda | ||
|
|
e823b92947 | ||
|
|
aea5cfe21e | ||
|
|
9ce193082a | ||
|
|
75c84c846a | ||
|
|
57535c039c |
17
.github/workflows/periodic_pagebench.yml
vendored
17
.github/workflows/periodic_pagebench.yml
vendored
@@ -43,10 +43,6 @@ jobs:
|
||||
AWS_DEFAULT_REGION : "eu-central-1"
|
||||
AWS_INSTANCE_ID : "i-02a59a3bf86bc7e74"
|
||||
steps:
|
||||
# we don't need the neon source code because we run everything remotely
|
||||
# however we still need the local github actions to run the allure step below
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Show my own (github runner) external IP address - usefull for IP allowlisting
|
||||
run: curl https://ifconfig.me
|
||||
|
||||
@@ -94,12 +90,10 @@ jobs:
|
||||
set +x
|
||||
status=$(echo $response | jq -r '.status')
|
||||
echo "Test status: $status"
|
||||
if [[ "$status" == "failure" ]]; then
|
||||
echo "Test failed"
|
||||
exit 1 # Fail the job step if status is failure
|
||||
elif [[ "$status" == "success" || "$status" == "null" ]]; then
|
||||
if [[ "$status" == "failure" || "$status" == "success" || "$status" == "null" ]]; then
|
||||
break
|
||||
elif [[ "$status" == "too_many_runs" ]]; then
|
||||
fi
|
||||
if [[ "$status" == "too_many_runs" ]]; then
|
||||
echo "Too many runs already running"
|
||||
echo "too_many_runs=true" >> "$GITHUB_OUTPUT"
|
||||
exit 1
|
||||
@@ -109,7 +103,6 @@ jobs:
|
||||
done
|
||||
|
||||
- name: Retrieve Test Logs
|
||||
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
|
||||
run: |
|
||||
curl -k -X 'GET' \
|
||||
"${EC2_MACHINE_URL_US}/test_log/${GITHUB_RUN_ID}" \
|
||||
@@ -118,15 +111,11 @@ jobs:
|
||||
--output "test_log_${GITHUB_RUN_ID}.gz"
|
||||
|
||||
- name: Unzip Test Log and Print it into this job's log
|
||||
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
|
||||
run: |
|
||||
gzip -d "test_log_${GITHUB_RUN_ID}.gz"
|
||||
cat "test_log_${GITHUB_RUN_ID}"
|
||||
|
||||
- name: Create Allure report
|
||||
env:
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ use std::{
|
||||
collections::HashMap,
|
||||
io::{BufRead, Read},
|
||||
num::{NonZeroU64, NonZeroUsize},
|
||||
str::FromStr,
|
||||
sync::atomic::AtomicUsize,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
@@ -438,7 +437,18 @@ pub enum CompactionAlgorithm {
|
||||
Tiered,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
strum_macros::FromRepr,
|
||||
strum_macros::EnumString,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum ImageCompressionAlgorithm {
|
||||
/// Disabled for writes, and never decompress during reading.
|
||||
/// Never set this after you've enabled compression once!
|
||||
@@ -458,31 +468,6 @@ impl ImageCompressionAlgorithm {
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for ImageCompressionAlgorithm {
|
||||
type Err = anyhow::Error;
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let mut components = s.split(['(', ')']);
|
||||
let first = components
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("empty string"))?;
|
||||
match first {
|
||||
"disabled-no-decompress" => Ok(ImageCompressionAlgorithm::DisabledNoDecompress),
|
||||
"disabled" => Ok(ImageCompressionAlgorithm::Disabled),
|
||||
"zstd" => {
|
||||
let level = if let Some(v) = components.next() {
|
||||
let v: i8 = v.parse()?;
|
||||
Some(v)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(ImageCompressionAlgorithm::Zstd { level })
|
||||
}
|
||||
_ => anyhow::bail!("invalid specifier '{first}'"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CompactionAlgorithmSettings {
|
||||
pub kind: CompactionAlgorithm,
|
||||
@@ -1675,29 +1660,4 @@ mod tests {
|
||||
AuxFilePolicy::CrossValidation
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_image_compression_algorithm_parsing() {
|
||||
use ImageCompressionAlgorithm::*;
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("disabled").unwrap(),
|
||||
Disabled
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("disabled-no-decompress").unwrap(),
|
||||
DisabledNoDecompress
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd").unwrap(),
|
||||
Zstd { level: None }
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd(18)").unwrap(),
|
||||
Zstd { level: Some(18) }
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd(-3)").unwrap(),
|
||||
Zstd { level: Some(-3) }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1456,12 +1456,10 @@ impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static LIVE_CONNECTIONS: Lazy<IntCounterPairVec> = Lazy::new(|| {
|
||||
register_int_counter_pair_vec!(
|
||||
"pageserver_live_connections_started",
|
||||
"Number of network connections that we started handling",
|
||||
"pageserver_live_connections_finished",
|
||||
"Number of network connections that we finished handling",
|
||||
pub(crate) static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_live_connections",
|
||||
"Number of live network connections",
|
||||
&["pageserver_connection_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
|
||||
@@ -55,7 +55,7 @@ use crate::basebackup::BasebackupError;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::import_datadir::import_wal_from_tar;
|
||||
use crate::metrics;
|
||||
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
|
||||
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS_COUNT};
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
@@ -215,9 +215,14 @@ async fn page_service_conn_main(
|
||||
auth_type: AuthType,
|
||||
connection_ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let _guard = LIVE_CONNECTIONS
|
||||
.with_label_values(&["page_service"])
|
||||
.guard();
|
||||
// Immediately increment the gauge, then create a job to decrement it on task exit.
|
||||
// One of the pros of `defer!` is that this will *most probably*
|
||||
// get called, even in presence of panics.
|
||||
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
|
||||
gauge.inc();
|
||||
scopeguard::defer! {
|
||||
gauge.dec();
|
||||
}
|
||||
|
||||
socket
|
||||
.set_nodelay(true)
|
||||
|
||||
@@ -365,7 +365,6 @@ pub struct Timeline {
|
||||
repartition_threshold: u64,
|
||||
|
||||
last_image_layer_creation_check_at: AtomicLsn,
|
||||
last_image_layer_creation_check_instant: std::sync::Mutex<Option<Instant>>,
|
||||
|
||||
/// Current logical size of the "datadir", at the last LSN.
|
||||
current_logical_size: LogicalSize,
|
||||
@@ -2385,7 +2384,6 @@ impl Timeline {
|
||||
)),
|
||||
repartition_threshold: 0,
|
||||
last_image_layer_creation_check_at: AtomicLsn::new(0),
|
||||
last_image_layer_creation_check_instant: Mutex::new(None),
|
||||
|
||||
last_received_wal: Mutex::new(None),
|
||||
rel_size_cache: RwLock::new(RelSizeCache {
|
||||
@@ -4466,58 +4464,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Predicate function which indicates whether we should check if new image layers
|
||||
/// are required. Since checking if new image layers are required is expensive in
|
||||
/// terms of CPU, we only do it in the following cases:
|
||||
/// 1. If the timeline has ingested sufficient WAL to justify the cost
|
||||
/// 2. If enough time has passed since the last check
|
||||
/// 2.1. For large tenants, we wish to perform the check more often since they
|
||||
/// suffer from the lack of image layers
|
||||
/// 2.2. For small tenants (that can mostly fit in RAM), we use a much longer interval
|
||||
fn should_check_if_image_layers_required(self: &Arc<Timeline>, lsn: Lsn) -> bool {
|
||||
const LARGE_TENANT_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024;
|
||||
|
||||
let last_checks_at = self.last_image_layer_creation_check_at.load();
|
||||
let distance = lsn
|
||||
.checked_sub(last_checks_at)
|
||||
.expect("Attempt to compact with LSN going backwards");
|
||||
let min_distance =
|
||||
self.get_image_layer_creation_check_threshold() as u64 * self.get_checkpoint_distance();
|
||||
|
||||
let distance_based_decision = distance.0 >= min_distance;
|
||||
|
||||
let mut time_based_decision = false;
|
||||
let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap();
|
||||
if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() {
|
||||
let check_required_after = if Into::<u64>::into(&logical_size) >= LARGE_TENANT_THRESHOLD
|
||||
{
|
||||
self.get_checkpoint_timeout()
|
||||
} else {
|
||||
Duration::from_secs(3600 * 48)
|
||||
};
|
||||
|
||||
time_based_decision = match *last_check_instant {
|
||||
Some(last_check) => {
|
||||
let elapsed = last_check.elapsed();
|
||||
elapsed >= check_required_after
|
||||
}
|
||||
None => true,
|
||||
};
|
||||
}
|
||||
|
||||
// Do the expensive delta layer counting only if this timeline has ingested sufficient
|
||||
// WAL since the last check or a checkpoint timeout interval has elapsed since the last
|
||||
// check.
|
||||
let decision = distance_based_decision || time_based_decision;
|
||||
|
||||
if decision {
|
||||
self.last_image_layer_creation_check_at.store(lsn);
|
||||
*last_check_instant = Some(Instant::now());
|
||||
}
|
||||
|
||||
decision
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
|
||||
async fn create_image_layers(
|
||||
self: &Arc<Timeline>,
|
||||
@@ -4540,7 +4486,22 @@ impl Timeline {
|
||||
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
|
||||
let mut start = Key::MIN;
|
||||
|
||||
let check_for_image_layers = self.should_check_if_image_layers_required(lsn);
|
||||
let check_for_image_layers = {
|
||||
let last_checks_at = self.last_image_layer_creation_check_at.load();
|
||||
let distance = lsn
|
||||
.checked_sub(last_checks_at)
|
||||
.expect("Attempt to compact with LSN going backwards");
|
||||
let min_distance = self.get_image_layer_creation_check_threshold() as u64
|
||||
* self.get_checkpoint_distance();
|
||||
|
||||
// Skip the expensive delta layer counting if this timeline has not ingested sufficient
|
||||
// WAL since the last check.
|
||||
distance.0 >= min_distance
|
||||
};
|
||||
|
||||
if check_for_image_layers {
|
||||
self.last_image_layer_creation_check_at.store(lsn);
|
||||
}
|
||||
|
||||
for partition in partitioning.parts.iter() {
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
|
||||
@@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument};
|
||||
use super::TaskStateUpdate;
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
|
||||
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::WALRECEIVER_RUNTIME,
|
||||
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
|
||||
@@ -208,9 +208,14 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
.instrument(tracing::info_span!("poller")),
|
||||
);
|
||||
|
||||
let _guard = LIVE_CONNECTIONS
|
||||
.with_label_values(&["wal_receiver"])
|
||||
.guard();
|
||||
// Immediately increment the gauge, then create a job to decrement it on task exit.
|
||||
// One of the pros of `defer!` is that this will *most probably*
|
||||
// get called, even in presence of panics.
|
||||
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
|
||||
gauge.inc();
|
||||
scopeguard::defer! {
|
||||
gauge.dec();
|
||||
}
|
||||
|
||||
let identify = identify_system(&replication_client).await?;
|
||||
info!("{identify:?}");
|
||||
|
||||
@@ -6,7 +6,6 @@ OBJS = \
|
||||
$(WIN32RES) \
|
||||
extension_server.o \
|
||||
file_cache.o \
|
||||
hll.o \
|
||||
libpagestore.o \
|
||||
neon.o \
|
||||
neon_utils.o \
|
||||
@@ -23,7 +22,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql
|
||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||
|
||||
EXTRA_CLEAN = \
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include "miscadmin.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "lib/hyperloglog.h"
|
||||
#include "pgstat.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include RELFILEINFO_HDR
|
||||
@@ -39,8 +40,6 @@
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/guc.h"
|
||||
|
||||
#include "hll.h"
|
||||
|
||||
/*
|
||||
* Local file cache is used to temporary store relations pages in local file system.
|
||||
* All blocks of all relations are stored inside one file and addressed using shared hash map.
|
||||
@@ -63,6 +62,7 @@
|
||||
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
|
||||
#define MB ((uint64)1024*1024)
|
||||
|
||||
#define HYPER_LOG_LOG_BIT_WIDTH 10
|
||||
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
|
||||
|
||||
typedef struct FileCacheEntry
|
||||
@@ -87,7 +87,8 @@ typedef struct FileCacheControl
|
||||
uint64 writes;
|
||||
dlist_head lru; /* double linked list for LRU replacement
|
||||
* algorithm */
|
||||
HyperLogLogState wss_estimation; /* estimation of working set size */
|
||||
hyperLogLogState wss_estimation; /* estimation of wroking set size */
|
||||
uint8_t hyperloglog_hashes[(1 << HYPER_LOG_LOG_BIT_WIDTH) + 1];
|
||||
} FileCacheControl;
|
||||
|
||||
static HTAB *lfc_hash;
|
||||
@@ -237,7 +238,12 @@ lfc_shmem_startup(void)
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
|
||||
/* Initialize hyper-log-log structure for estimating working set size */
|
||||
initSHLL(&lfc_ctl->wss_estimation);
|
||||
initHyperLogLog(&lfc_ctl->wss_estimation, HYPER_LOG_LOG_BIT_WIDTH);
|
||||
|
||||
/* We need hashes in shared memory */
|
||||
pfree(lfc_ctl->wss_estimation.hashesArr);
|
||||
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
|
||||
lfc_ctl->wss_estimation.hashesArr = lfc_ctl->hyperloglog_hashes;
|
||||
|
||||
/* Recreate file cache on restart */
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
@@ -539,7 +545,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
/* Approximate working set */
|
||||
tag.blockNum = blkno;
|
||||
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
|
||||
addHyperLogLog(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
|
||||
|
||||
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
|
||||
{
|
||||
@@ -980,38 +986,20 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
SRF_RETURN_DONE(funcctx);
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds);
|
||||
|
||||
Datum
|
||||
approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
|
||||
{
|
||||
if (lfc_size_limit != 0)
|
||||
{
|
||||
int32 dc;
|
||||
time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0);
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration);
|
||||
LWLockRelease(lfc_lock);
|
||||
PG_RETURN_INT32(dc);
|
||||
}
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(approximate_working_set_size);
|
||||
|
||||
Datum
|
||||
approximate_working_set_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int32 dc = -1;
|
||||
if (lfc_size_limit != 0)
|
||||
{
|
||||
int32 dc;
|
||||
bool reset = PG_GETARG_BOOL(0);
|
||||
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
|
||||
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1);
|
||||
dc = (int32) estimateHyperLogLog(&lfc_ctl->wss_estimation);
|
||||
if (reset)
|
||||
memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs);
|
||||
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
|
||||
LWLockRelease(lfc_lock);
|
||||
PG_RETURN_INT32(dc);
|
||||
}
|
||||
PG_RETURN_NULL();
|
||||
PG_RETURN_INT32(dc);
|
||||
}
|
||||
|
||||
193
pgxn/neon/hll.c
193
pgxn/neon/hll.c
@@ -1,193 +0,0 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* hll.c
|
||||
* Sliding HyperLogLog cardinality estimator
|
||||
*
|
||||
* Portions Copyright (c) 2014-2023, PostgreSQL Global Development Group
|
||||
*
|
||||
* Implements https://hal.science/hal-00465313/document
|
||||
*
|
||||
* Based on Hideaki Ohno's C++ implementation. This is probably not ideally
|
||||
* suited to estimating the cardinality of very large sets; in particular, we
|
||||
* have not attempted to further optimize the implementation as described in
|
||||
* the Heule, Nunkesser and Hall paper "HyperLogLog in Practice: Algorithmic
|
||||
* Engineering of a State of The Art Cardinality Estimation Algorithm".
|
||||
*
|
||||
* A sparse representation of HyperLogLog state is used, with fixed space
|
||||
* overhead.
|
||||
*
|
||||
* The copyright terms of Ohno's original version (the MIT license) follow.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/lib/hyperloglog.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (c) 2013 Hideaki Ohno <hide.o.j55{at}gmail.com>
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the 'Software'), to
|
||||
* deal in the Software without restriction, including without limitation the
|
||||
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
* sell copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
* IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include <math.h>
|
||||
|
||||
#include "postgres.h"
|
||||
#include "funcapi.h"
|
||||
#include "port/pg_bitutils.h"
|
||||
#include "utils/timestamp.h"
|
||||
#include "hll.h"
|
||||
|
||||
|
||||
#define POW_2_32 (4294967296.0)
|
||||
#define NEG_POW_2_32 (-4294967296.0)
|
||||
|
||||
#define ALPHA_MM ((0.7213 / (1.0 + 1.079 / HLL_N_REGISTERS)) * HLL_N_REGISTERS * HLL_N_REGISTERS)
|
||||
|
||||
/*
|
||||
* Worker for addHyperLogLog().
|
||||
*
|
||||
* Calculates the position of the first set bit in first b bits of x argument
|
||||
* starting from the first, reading from most significant to least significant
|
||||
* bits.
|
||||
*
|
||||
* Example (when considering fist 10 bits of x):
|
||||
*
|
||||
* rho(x = 0b1000000000) returns 1
|
||||
* rho(x = 0b0010000000) returns 3
|
||||
* rho(x = 0b0000000000) returns b + 1
|
||||
*
|
||||
* "The binary address determined by the first b bits of x"
|
||||
*
|
||||
* Return value "j" used to index bit pattern to watch.
|
||||
*/
|
||||
static inline uint8
|
||||
rho(uint32 x, uint8 b)
|
||||
{
|
||||
uint8 j = 1;
|
||||
|
||||
if (x == 0)
|
||||
return b + 1;
|
||||
|
||||
j = 32 - pg_leftmost_one_pos32(x);
|
||||
|
||||
if (j > b)
|
||||
return b + 1;
|
||||
|
||||
return j;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize HyperLogLog track state
|
||||
*/
|
||||
void
|
||||
initSHLL(HyperLogLogState *cState)
|
||||
{
|
||||
memset(cState->regs, 0, sizeof(cState->regs));
|
||||
}
|
||||
|
||||
/*
|
||||
* Adds element to the estimator, from caller-supplied hash.
|
||||
*
|
||||
* It is critical that the hash value passed be an actual hash value, typically
|
||||
* generated using hash_any(). The algorithm relies on a specific bit-pattern
|
||||
* observable in conjunction with stochastic averaging. There must be a
|
||||
* uniform distribution of bits in hash values for each distinct original value
|
||||
* observed.
|
||||
*/
|
||||
void
|
||||
addSHLL(HyperLogLogState *cState, uint32 hash)
|
||||
{
|
||||
uint8 count;
|
||||
uint32 index;
|
||||
size_t i;
|
||||
size_t j;
|
||||
|
||||
TimestampTz now = GetCurrentTimestamp();
|
||||
/* Use the first "k" (registerWidth) bits as a zero based index */
|
||||
index = hash >> HLL_C_BITS;
|
||||
|
||||
/* Compute the rank of the remaining 32 - "k" (registerWidth) bits */
|
||||
count = rho(hash << HLL_BIT_WIDTH, HLL_C_BITS);
|
||||
|
||||
cState->regs[index][count] = now;
|
||||
}
|
||||
|
||||
static uint8
|
||||
getMaximum(const TimestampTz* reg, TimestampTz since)
|
||||
{
|
||||
uint8 max = 0;
|
||||
|
||||
for (size_t i = 0; i < HLL_C_BITS + 1; i++)
|
||||
{
|
||||
if (reg[i] >= since)
|
||||
{
|
||||
max = i;
|
||||
}
|
||||
}
|
||||
|
||||
return max;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Estimates cardinality, based on elements added so far
|
||||
*/
|
||||
double
|
||||
estimateSHLL(HyperLogLogState *cState, time_t duration)
|
||||
{
|
||||
double result;
|
||||
double sum = 0.0;
|
||||
size_t i;
|
||||
uint8 R[HLL_N_REGISTERS];
|
||||
/* 0 indicates uninitialized timestamp, so if we need to cover the whole range than starts with 1 */
|
||||
TimestampTz since = duration == (time_t)-1 ? 1 : GetCurrentTimestamp() - duration * USECS_PER_SEC;
|
||||
|
||||
for (i = 0; i < HLL_N_REGISTERS; i++)
|
||||
{
|
||||
R[i] = getMaximum(cState->regs[i], since);
|
||||
sum += 1.0 / pow(2.0, R[i]);
|
||||
}
|
||||
|
||||
/* result set to "raw" HyperLogLog estimate (E in the HyperLogLog paper) */
|
||||
result = ALPHA_MM / sum;
|
||||
|
||||
if (result <= (5.0 / 2.0) * HLL_N_REGISTERS)
|
||||
{
|
||||
/* Small range correction */
|
||||
int zero_count = 0;
|
||||
|
||||
for (i = 0; i < HLL_N_REGISTERS; i++)
|
||||
{
|
||||
zero_count += R[i] == 0;
|
||||
}
|
||||
|
||||
if (zero_count != 0)
|
||||
result = HLL_N_REGISTERS * log((double) HLL_N_REGISTERS /
|
||||
zero_count);
|
||||
}
|
||||
else if (result > (1.0 / 30.0) * POW_2_32)
|
||||
{
|
||||
/* Large range correction */
|
||||
result = NEG_POW_2_32 * log(1.0 - (result / POW_2_32));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* hll.h
|
||||
* Sliding HyperLogLog cardinality estimator
|
||||
*
|
||||
* Portions Copyright (c) 2014-2023, PostgreSQL Global Development Group
|
||||
*
|
||||
* Implements https://hal.science/hal-00465313/document
|
||||
*
|
||||
* Based on Hideaki Ohno's C++ implementation. This is probably not ideally
|
||||
* suited to estimating the cardinality of very large sets; in particular, we
|
||||
* have not attempted to further optimize the implementation as described in
|
||||
* the Heule, Nunkesser and Hall paper "HyperLogLog in Practice: Algorithmic
|
||||
* Engineering of a State of The Art Cardinality Estimation Algorithm".
|
||||
*
|
||||
* A sparse representation of HyperLogLog state is used, with fixed space
|
||||
* overhead.
|
||||
*
|
||||
* The copyright terms of Ohno's original version (the MIT license) follow.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/lib/hyperloglog.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (c) 2013 Hideaki Ohno <hide.o.j55{at}gmail.com>
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the 'Software'), to
|
||||
* deal in the Software without restriction, including without limitation the
|
||||
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
* sell copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
* IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef HLL_H
|
||||
#define HLL_H
|
||||
|
||||
#define HLL_BIT_WIDTH 10
|
||||
#define HLL_C_BITS (32 - HLL_BIT_WIDTH)
|
||||
#define HLL_N_REGISTERS (1 << HLL_BIT_WIDTH)
|
||||
|
||||
/*
|
||||
* HyperLogLog is an approximate technique for computing the number of distinct
|
||||
* entries in a set. Importantly, it does this by using a fixed amount of
|
||||
* memory. See the 2007 paper "HyperLogLog: the analysis of a near-optimal
|
||||
* cardinality estimation algorithm" for more.
|
||||
*
|
||||
* Instead of a single counter for every bits register, we have a timestamp
|
||||
* for every valid number of bits we can encounter. Every time we encounter
|
||||
* a certain number of bits, we update the timestamp in those registers to
|
||||
* the current timestamp.
|
||||
*
|
||||
* We can query the sketch's stored cardinality for the range of some timestamp
|
||||
* up to now: For each register, we return the highest bits bucket that has a
|
||||
* modified timestamp >= the query timestamp. This value is the number of bits
|
||||
* for this register in the normal HLL calculation.
|
||||
*
|
||||
* The memory usage is 2^B * (C + 1) * sizeof(TimetampTz), or 184kiB.
|
||||
* Usage could be halved if we decide to reduce the required time dimension
|
||||
* precision; as 32 bits in second precision should be enough for statistics.
|
||||
* However, that is not yet implemented.
|
||||
*/
|
||||
typedef struct HyperLogLogState
|
||||
{
|
||||
TimestampTz regs[HLL_N_REGISTERS][HLL_C_BITS + 1];
|
||||
} HyperLogLogState;
|
||||
|
||||
extern void initSHLL(HyperLogLogState *cState);
|
||||
extern void addSHLL(HyperLogLogState *cState, uint32 hash);
|
||||
extern double estimateSHLL(HyperLogLogState *cState, time_t dutration);
|
||||
|
||||
#endif
|
||||
@@ -1,9 +0,0 @@
|
||||
\echo Use "ALTER EXTENSION neon UPDATE TO '1.4'" to load this file. \quit
|
||||
|
||||
CREATE FUNCTION approximate_working_set_size_seconds(duration integer default null)
|
||||
RETURNS integer
|
||||
AS 'MODULE_PATHNAME', 'approximate_working_set_size_seconds'
|
||||
LANGUAGE C PARALLEL SAFE;
|
||||
|
||||
GRANT EXECUTE ON FUNCTION approximate_working_set_size_seconds(integer) TO pg_monitor;
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
DROP FUNCTION IF EXISTS approximate_working_set_size_seconds(integer) CASCADE;
|
||||
@@ -63,6 +63,8 @@ char *wal_acceptors_list = "";
|
||||
int wal_acceptor_reconnect_timeout = 1000;
|
||||
int wal_acceptor_connection_timeout = 10000;
|
||||
|
||||
int max_vacuum_defer_cleanup_age = 0;
|
||||
|
||||
/* Set to true in the walproposer bgw. */
|
||||
static bool am_walproposer;
|
||||
static WalproposerShmemState *walprop_shared;
|
||||
@@ -218,6 +220,16 @@ nwp_register_gucs(void)
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_MS,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.max_vacuum_defer_cleanup_age",
|
||||
"Restrict oldest xmin pinned by hot standby feedback to prevent bloating of master",
|
||||
NULL,
|
||||
&max_vacuum_defer_cleanup_age,
|
||||
0, 0, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1855,6 +1867,7 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
|
||||
FullTransactionId xmin = hsFeedback.xmin;
|
||||
FullTransactionId catalog_xmin = hsFeedback.catalog_xmin;
|
||||
FullTransactionId next_xid = ReadNextFullTransactionId();
|
||||
|
||||
/*
|
||||
* Page server is updating nextXid in checkpoint each 1024 transactions,
|
||||
* so feedback xmin can be actually larger then nextXid and
|
||||
@@ -1863,8 +1876,14 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
|
||||
*/
|
||||
if (FullTransactionIdPrecedes(next_xid, xmin))
|
||||
xmin = next_xid;
|
||||
else if (max_vacuum_defer_cleanup_age != 0 && xmin.value < next_xid.value - max_vacuum_defer_cleanup_age)
|
||||
xmin.value = next_xid.value - max_vacuum_defer_cleanup_age;
|
||||
|
||||
if (FullTransactionIdPrecedes(next_xid, catalog_xmin))
|
||||
catalog_xmin = next_xid;
|
||||
else if (max_vacuum_defer_cleanup_age != 0 && catalog_xmin.value < next_xid.value - max_vacuum_defer_cleanup_age)
|
||||
catalog_xmin.value = next_xid.value - max_vacuum_defer_cleanup_age;
|
||||
|
||||
agg_hs_feedback = hsFeedback;
|
||||
elog(DEBUG2, "ProcessStandbyHSFeedback(xmin=%d, catalog_xmin=%d", XidFromFullTransactionId(hsFeedback.xmin), XidFromFullTransactionId(hsFeedback.catalog_xmin));
|
||||
ProcessStandbyHSFeedback(hsFeedback.ts,
|
||||
|
||||
@@ -7,7 +7,7 @@ OBJS = \
|
||||
neontest.o
|
||||
|
||||
EXTENSION = neon_test_utils
|
||||
DATA = neon_test_utils--1.3.sql
|
||||
DATA = neon_test_utils--1.2.sql
|
||||
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
|
||||
|
||||
PG_CONFIG = pg_config
|
||||
|
||||
@@ -45,21 +45,3 @@ CREATE FUNCTION neon_xlogflush(lsn pg_lsn DEFAULT NULL)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'neon_xlogflush'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION trigger_panic()
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'trigger_panic'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION trigger_segfault()
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'trigger_segfault'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
-- Alias for `trigger_segfault`, just because `SELECT 💣()` looks fun
|
||||
CREATE OR REPLACE FUNCTION 💣() RETURNS void
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
PERFORM trigger_segfault();
|
||||
END;
|
||||
$$;
|
||||
@@ -1,6 +1,6 @@
|
||||
# neon_test_utils extension
|
||||
comment = 'helpers for neon testing and debugging'
|
||||
default_version = '1.3'
|
||||
default_version = '1.2'
|
||||
module_pathname = '$libdir/neon_test_utils'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -42,8 +42,6 @@ PG_FUNCTION_INFO_V1(clear_buffer_cache);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
|
||||
PG_FUNCTION_INFO_V1(neon_xlogflush);
|
||||
PG_FUNCTION_INFO_V1(trigger_panic);
|
||||
PG_FUNCTION_INFO_V1(trigger_segfault);
|
||||
|
||||
/*
|
||||
* Linkage to functions in neon module.
|
||||
@@ -491,24 +489,3 @@ neon_xlogflush(PG_FUNCTION_ARGS)
|
||||
XLogFlush(lsn);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/*
|
||||
* Function to trigger panic.
|
||||
*/
|
||||
Datum
|
||||
trigger_panic(PG_FUNCTION_ARGS)
|
||||
{
|
||||
elog(PANIC, "neon_test_utils: panic");
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/*
|
||||
* Function to trigger a segfault.
|
||||
*/
|
||||
Datum
|
||||
trigger_segfault(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int *ptr = NULL;
|
||||
*ptr = 42;
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
8
poetry.lock
generated
8
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohttp"
|
||||
@@ -734,13 +734,13 @@ typing-extensions = ">=4.1.0"
|
||||
|
||||
[[package]]
|
||||
name = "certifi"
|
||||
version = "2024.7.4"
|
||||
version = "2023.7.22"
|
||||
description = "Python package for providing Mozilla's CA Bundle."
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
{file = "certifi-2024.7.4-py3-none-any.whl", hash = "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"},
|
||||
{file = "certifi-2024.7.4.tar.gz", hash = "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"},
|
||||
{file = "certifi-2023.7.22-py3-none-any.whl", hash = "sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9"},
|
||||
{file = "certifi-2023.7.22.tar.gz", hash = "sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -445,19 +445,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
.map(|res| ("WAL service main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_service_handle));
|
||||
|
||||
let timeline_housekeeping_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
||||
.spawn(async move {
|
||||
const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
|
||||
loop {
|
||||
tokio::time::sleep(TOMBSTONE_TTL).await;
|
||||
GlobalTimelines::housekeeping(&TOMBSTONE_TTL);
|
||||
}
|
||||
})
|
||||
.map(|res| ("Timeline map housekeeping".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(timeline_housekeeping_handle));
|
||||
|
||||
if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
|
||||
let conf_ = conf.clone();
|
||||
let wal_service_handle = current_thread_rt
|
||||
|
||||
@@ -15,19 +15,12 @@ use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::*;
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
struct GlobalTimelinesState {
|
||||
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
|
||||
|
||||
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
|
||||
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
|
||||
// this map is dropped on restart.
|
||||
tombstones: HashMap<TenantTimelineId, Instant>,
|
||||
|
||||
conf: Option<SafeKeeperConf>,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
|
||||
@@ -71,17 +64,11 @@ impl GlobalTimelinesState {
|
||||
.cloned()
|
||||
.ok_or(TimelineError::NotFound(*ttid))
|
||||
}
|
||||
|
||||
fn delete(&mut self, ttid: TenantTimelineId) {
|
||||
self.timelines.remove(&ttid);
|
||||
self.tombstones.insert(ttid, Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
|
||||
Mutex::new(GlobalTimelinesState {
|
||||
timelines: HashMap::new(),
|
||||
tombstones: HashMap::new(),
|
||||
conf: None,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
|
||||
@@ -211,17 +198,11 @@ impl GlobalTimelines {
|
||||
let tli = Arc::new(timeline);
|
||||
|
||||
// TODO: prevent concurrent timeline creation/loading
|
||||
{
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
|
||||
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
|
||||
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
|
||||
if state.tombstones.remove(&ttid).is_some() {
|
||||
warn!("Un-deleted timeline {ttid}");
|
||||
}
|
||||
|
||||
state.timelines.insert(ttid, tli.clone());
|
||||
}
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
|
||||
tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);
|
||||
|
||||
@@ -248,7 +229,7 @@ impl GlobalTimelines {
|
||||
|
||||
/// Create a new timeline with the given id. If the timeline already exists, returns
|
||||
/// an existing timeline.
|
||||
pub(crate) async fn create(
|
||||
pub async fn create(
|
||||
ttid: TenantTimelineId,
|
||||
server_info: ServerInfo,
|
||||
commit_lsn: Lsn,
|
||||
@@ -260,11 +241,6 @@ impl GlobalTimelines {
|
||||
// Timeline already exists, return it.
|
||||
return Ok(timeline);
|
||||
}
|
||||
|
||||
if state.tombstones.contains_key(&ttid) {
|
||||
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
|
||||
}
|
||||
|
||||
state.get_dependencies()
|
||||
};
|
||||
|
||||
@@ -324,19 +300,17 @@ impl GlobalTimelines {
|
||||
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
|
||||
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
|
||||
/// i.e. loaded in memory and not cancelled.
|
||||
pub(crate) fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
|
||||
let tli_res = {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
state.get(&ttid)
|
||||
};
|
||||
match tli_res {
|
||||
pub fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
|
||||
let res = TIMELINES_STATE.lock().unwrap().get(&ttid);
|
||||
|
||||
match res {
|
||||
Ok(tli) => {
|
||||
if tli.is_cancelled() {
|
||||
return Err(TimelineError::Cancelled(ttid));
|
||||
}
|
||||
Ok(tli)
|
||||
}
|
||||
_ => tli_res,
|
||||
_ => res,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -365,26 +339,12 @@ impl GlobalTimelines {
|
||||
|
||||
/// Cancels timeline, then deletes the corresponding data directory.
|
||||
/// If only_local, doesn't remove WAL segments in remote storage.
|
||||
pub(crate) async fn delete(
|
||||
pub async fn delete(
|
||||
ttid: &TenantTimelineId,
|
||||
only_local: bool,
|
||||
) -> Result<TimelineDeleteForceResult> {
|
||||
let tli_res = {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
|
||||
if state.tombstones.contains_key(ttid) {
|
||||
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
|
||||
info!("Timeline {ttid} was already deleted");
|
||||
return Ok(TimelineDeleteForceResult {
|
||||
dir_existed: false,
|
||||
was_active: false,
|
||||
});
|
||||
}
|
||||
|
||||
state.get(ttid)
|
||||
};
|
||||
|
||||
let result = match tli_res {
|
||||
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
|
||||
match tli_res {
|
||||
Ok(timeline) => {
|
||||
let was_active = timeline.broker_active.load(Ordering::Relaxed);
|
||||
|
||||
@@ -394,6 +354,11 @@ impl GlobalTimelines {
|
||||
info!("deleting timeline {}, only_local={}", ttid, only_local);
|
||||
let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
|
||||
|
||||
// Remove timeline from the map.
|
||||
// FIXME: re-enable it once we fix the issue with recreation of deleted timelines
|
||||
// https://github.com/neondatabase/neon/issues/3146
|
||||
// TIMELINES_STATE.lock().unwrap().timelines.remove(ttid);
|
||||
|
||||
Ok(TimelineDeleteForceResult {
|
||||
dir_existed,
|
||||
was_active, // TODO: we probably should remove this field
|
||||
@@ -409,14 +374,7 @@ impl GlobalTimelines {
|
||||
was_active: false,
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
|
||||
// are used to prevent still-running computes from re-creating the same timeline when they send data,
|
||||
// and to speed up repeated deletion calls by avoiding re-listing objects.
|
||||
TIMELINES_STATE.lock().unwrap().delete(*ttid);
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
|
||||
@@ -462,20 +420,19 @@ impl GlobalTimelines {
|
||||
tenant_id,
|
||||
))?;
|
||||
|
||||
// FIXME: we temporarily disabled removing timelines from the map, see `delete_force`
|
||||
// let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
|
||||
// if !tlis_after_delete.is_empty() {
|
||||
// // Some timelines were created while we were deleting them, returning error
|
||||
// // to the caller, so it can retry later.
|
||||
// bail!(
|
||||
// "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
|
||||
// tenant_id
|
||||
// );
|
||||
// }
|
||||
|
||||
Ok(deleted)
|
||||
}
|
||||
|
||||
pub fn housekeeping(tombstone_ttl: &Duration) {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
|
||||
// We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
|
||||
// timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
|
||||
// may recreate a deleted timeline.
|
||||
let now = Instant::now();
|
||||
state
|
||||
.tombstones
|
||||
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Serialize)]
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use futures::StreamExt;
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -29,7 +29,7 @@ impl LargeObjectKind {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct LargeObject {
|
||||
pub key: String,
|
||||
pub size: u64,
|
||||
@@ -45,76 +45,53 @@ pub async fn find_large_objects(
|
||||
bucket_config: BucketConfig,
|
||||
min_size: u64,
|
||||
ignore_deltas: bool,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<LargeObjectListing> {
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
|
||||
|
||||
let objects_stream = tenants.map_ok(|tenant_shard_id| {
|
||||
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
||||
let s3_client = s3_client.clone();
|
||||
async move {
|
||||
let mut objects = Vec::new();
|
||||
let mut total_objects_ctr = 0u64;
|
||||
// We want the objects and not just common prefixes
|
||||
tenant_root.delimiter.clear();
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let fetch_response =
|
||||
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
|
||||
.await?;
|
||||
for obj in fetch_response.contents().iter().filter(|o| {
|
||||
if let Some(obj_size) = o.size {
|
||||
min_size as i64 <= obj_size
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}) {
|
||||
let key = obj.key().expect("couldn't get key").to_owned();
|
||||
let kind = LargeObjectKind::from_key(&key);
|
||||
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
|
||||
continue;
|
||||
}
|
||||
objects.push(LargeObject {
|
||||
key,
|
||||
size: obj.size.unwrap() as u64,
|
||||
kind,
|
||||
})
|
||||
}
|
||||
total_objects_ctr += fetch_response.contents().len() as u64;
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok((tenant_shard_id, objects, total_objects_ctr))
|
||||
}
|
||||
});
|
||||
let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
|
||||
|
||||
let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
|
||||
let mut objects = Vec::new();
|
||||
|
||||
let mut tenant_ctr = 0u64;
|
||||
let mut object_ctr = 0u64;
|
||||
while let Some(res) = objects_stream.next().await {
|
||||
let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
|
||||
objects.extend_from_slice(&objects_slice);
|
||||
while let Some(tenant_shard_id) = tenants.next().await {
|
||||
let tenant_shard_id = tenant_shard_id?;
|
||||
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
||||
// We want the objects and not just common prefixes
|
||||
tenant_root.delimiter.clear();
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let fetch_response =
|
||||
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
|
||||
.await?;
|
||||
for obj in fetch_response.contents().iter().filter(|o| {
|
||||
if let Some(obj_size) = o.size {
|
||||
min_size as i64 <= obj_size
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}) {
|
||||
let key = obj.key().expect("couldn't get key").to_owned();
|
||||
let kind = LargeObjectKind::from_key(&key);
|
||||
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
|
||||
continue;
|
||||
}
|
||||
objects.push(LargeObject {
|
||||
key,
|
||||
size: obj.size.unwrap() as u64,
|
||||
kind,
|
||||
})
|
||||
}
|
||||
object_ctr += fetch_response.contents().len() as u64;
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
object_ctr += total_objects_ctr;
|
||||
tenant_ctr += 1;
|
||||
if tenant_ctr % 100 == 0 {
|
||||
if tenant_ctr % 50 == 0 {
|
||||
tracing::info!(
|
||||
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
|
||||
objects.len()
|
||||
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let bucket_name = target.bucket_name();
|
||||
tracing::info!(
|
||||
"Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
|
||||
objects.len()
|
||||
);
|
||||
Ok(LargeObjectListing { objects })
|
||||
}
|
||||
|
||||
@@ -78,8 +78,6 @@ enum Command {
|
||||
min_size: u64,
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
ignore_deltas: bool,
|
||||
#[arg(long = "concurrency", short = 'j', default_value_t = 64)]
|
||||
concurrency: usize,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -212,15 +210,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::FindLargeObjects {
|
||||
min_size,
|
||||
ignore_deltas,
|
||||
concurrency,
|
||||
} => {
|
||||
let summary = find_large_objects::find_large_objects(
|
||||
bucket_config,
|
||||
min_size,
|
||||
ignore_deltas,
|
||||
concurrency,
|
||||
)
|
||||
.await?;
|
||||
let summary =
|
||||
find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas)
|
||||
.await?;
|
||||
println!("{}", serde_json::to_string(&summary).unwrap());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -943,8 +943,6 @@ class NeonEnvBuilder:
|
||||
# if the test threw an exception, don't check for errors
|
||||
# as a failing assertion would cause the cleanup below to fail
|
||||
ps_assert_metric_no_errors=(exc_type is None),
|
||||
# do not fail on endpoint errors to allow the rest of cleanup to proceed
|
||||
fail_on_endpoint_errors=False,
|
||||
)
|
||||
cleanup_error = None
|
||||
|
||||
@@ -1216,11 +1214,11 @@ class NeonEnv:
|
||||
for f in futs:
|
||||
f.result()
|
||||
|
||||
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
|
||||
def stop(self, immediate=False, ps_assert_metric_no_errors=False):
|
||||
"""
|
||||
After this method returns, there should be no child processes running.
|
||||
"""
|
||||
self.endpoints.stop_all(fail_on_endpoint_errors)
|
||||
self.endpoints.stop_all()
|
||||
|
||||
# Stop storage controller before pageservers: we don't want it to spuriously
|
||||
# detect a pageserver "failure" during test teardown
|
||||
@@ -3901,17 +3899,9 @@ class EndpointFactory:
|
||||
pageserver_id=pageserver_id,
|
||||
)
|
||||
|
||||
def stop_all(self, fail_on_error=True) -> "EndpointFactory":
|
||||
exception = None
|
||||
def stop_all(self) -> "EndpointFactory":
|
||||
for ep in self.endpoints:
|
||||
try:
|
||||
ep.stop()
|
||||
except Exception as e:
|
||||
log.error(f"Failed to stop endpoint {ep.endpoint_id}: {e}")
|
||||
exception = e
|
||||
|
||||
if fail_on_error and exception is not None:
|
||||
raise exception
|
||||
ep.stop()
|
||||
|
||||
return self
|
||||
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"sql_func",
|
||||
[
|
||||
"trigger_panic",
|
||||
"trigger_segfault",
|
||||
"💣", # calls `trigger_segfault` internally
|
||||
],
|
||||
)
|
||||
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
|
||||
"""
|
||||
Test that triggering crash from neon_test_utils crashes the endpoint
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_endpoint_crash")
|
||||
endpoint = env.endpoints.create_start("test_endpoint_crash")
|
||||
|
||||
endpoint.safe_psql("CREATE EXTENSION neon_test_utils;")
|
||||
with pytest.raises(Exception, match="This probably means the server terminated abnormally"):
|
||||
endpoint.safe_psql(f"SELECT {sql_func}();")
|
||||
@@ -1,4 +1,3 @@
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.log_helper import log
|
||||
@@ -73,46 +72,3 @@ WITH (fillfactor='100');
|
||||
blocks = query_scalar(cur, "select approximate_working_set_size(true)")
|
||||
log.info(f"working set size after some index access of a few select pages only {blocks}")
|
||||
assert blocks < 10
|
||||
|
||||
|
||||
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
config_lines=[
|
||||
"autovacuum = off",
|
||||
"shared_buffers=1MB",
|
||||
"neon.max_file_cache_size=256MB",
|
||||
"neon.file_cache_size_limit=245MB",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon version '1.4'")
|
||||
cur.execute(
|
||||
"create table t(pk integer primary key, count integer default 0, payload text default repeat('?', 128))"
|
||||
)
|
||||
cur.execute("insert into t (pk) values (generate_series(1,1000000))")
|
||||
time.sleep(2)
|
||||
before_10k = time.monotonic()
|
||||
cur.execute("select sum(count) from t where pk between 10000 and 20000")
|
||||
time.sleep(2)
|
||||
before_1k = time.monotonic()
|
||||
cur.execute("select sum(count) from t where pk between 1000 and 2000")
|
||||
after = time.monotonic()
|
||||
|
||||
cur.execute(f"select approximate_working_set_size_seconds({int(after - before_1k + 1)})")
|
||||
estimation_1k = cur.fetchall()[0][0]
|
||||
log.info(f"Working set size for selecting 1k records {estimation_1k}")
|
||||
|
||||
cur.execute(f"select approximate_working_set_size_seconds({int(after - before_10k + 1)})")
|
||||
estimation_10k = cur.fetchall()[0][0]
|
||||
log.info(f"Working set size for selecting 10k records {estimation_10k}")
|
||||
|
||||
cur.execute("select pg_table_size('t')")
|
||||
size = cur.fetchall()[0][0] // 8192
|
||||
log.info(f"Table size {size} blocks")
|
||||
|
||||
assert estimation_1k >= 20 and estimation_1k <= 40
|
||||
assert estimation_10k >= 200 and estimation_10k <= 400
|
||||
|
||||
@@ -50,7 +50,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
|
||||
# Ensure that the default version is also updated in the neon.control file
|
||||
assert cur.fetchone() == ("1.3",)
|
||||
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
|
||||
all_versions = ["1.4", "1.3", "1.2", "1.1", "1.0"]
|
||||
all_versions = ["1.3", "1.2", "1.1", "1.0"]
|
||||
current_version = "1.3"
|
||||
for idx, begin_version in enumerate(all_versions):
|
||||
for target_version in all_versions[idx + 1 :]:
|
||||
|
||||
@@ -16,8 +16,6 @@ from fixtures.pageserver.utils import (
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage, s3_storage
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
# A tenant configuration that is convenient for generating uploads and deletions
|
||||
# without a large amount of postgres traffic.
|
||||
@@ -61,7 +59,7 @@ def evict_random_layers(
|
||||
|
||||
|
||||
@pytest.mark.parametrize("seed", [1, 2, 3])
|
||||
def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver, seed: int):
|
||||
def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
|
||||
"""
|
||||
Issue many location configuration changes, ensure that tenants
|
||||
remain readable & we don't get any unexpected errors. We should
|
||||
@@ -75,20 +73,6 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=s3_storage(),
|
||||
)
|
||||
neon_env_builder.control_plane_compute_hook_api = (
|
||||
f"http://{make_httpserver.host}:{make_httpserver.port}/notify-attach"
|
||||
)
|
||||
|
||||
def ignore_notify(request: Request):
|
||||
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
|
||||
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.
|
||||
log.info(f"Ignoring storage controller compute notification: {request.json}")
|
||||
return Response(status=200)
|
||||
|
||||
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(
|
||||
ignore_notify
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
|
||||
pageservers = env.pageservers
|
||||
@@ -115,15 +99,6 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
|
||||
workload.init(env.pageservers[0].id)
|
||||
workload.write_rows(256, env.pageservers[0].id)
|
||||
|
||||
# Discourage the storage controller from interfering with the changes we will make directly on the pageserver
|
||||
env.storage_controller.tenant_policy_update(
|
||||
tenant_id,
|
||||
{
|
||||
"scheduling": "Stop",
|
||||
},
|
||||
)
|
||||
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy Stop.*")
|
||||
|
||||
# We use a fixed seed to make the test reproducible: we want a randomly
|
||||
# chosen order, but not to change the order every time we run the test.
|
||||
rng = random.Random(seed)
|
||||
|
||||
@@ -54,4 +54,4 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
|
||||
pcur.execute(f"INSERT into t values ({n_records}, 0)")
|
||||
n_records += 1
|
||||
with sub.cursor() as scur:
|
||||
wait_until(60, 0.5, check_that_changes_propagated)
|
||||
wait_until(10, 0.5, check_that_changes_propagated)
|
||||
|
||||
@@ -67,9 +67,8 @@ def test_tenant_delete_smoke(
|
||||
|
||||
# first try to delete non existing tenant
|
||||
tenant_id = TenantId.generate()
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[".*NotFound.*", ".*simulated failure.*", ".*failed to delete .+ objects.*"]
|
||||
)
|
||||
env.pageserver.allowed_errors.append(".*NotFound.*")
|
||||
env.pageserver.allowed_errors.append(".*simulated failure.*")
|
||||
|
||||
# Check that deleting a non-existent tenant gives the expected result: this is a loop because we
|
||||
# may need to retry on some remote storage errors injected by the test harness
|
||||
|
||||
Reference in New Issue
Block a user