mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Compare commits
1 Commits
jcsp/tests
...
arpad/blob
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
229157e323 |
@@ -135,7 +135,7 @@ runs:
|
||||
fi
|
||||
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
|
||||
# -n sets the number of parallel processes that pytest-xdist will run
|
||||
EXTRA_PARAMS="-n10 $EXTRA_PARAMS"
|
||||
EXTRA_PARAMS="-n12 $EXTRA_PARAMS"
|
||||
|
||||
# --dist=loadgroup points tests marked with @pytest.mark.xdist_group
|
||||
# to the same worker to make @pytest.mark.order work with xdist
|
||||
|
||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -771,7 +771,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_core"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=arpad%2Fblob_batch#e34e9d277391a394dce6bd016db8d50991462f72"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
@@ -800,7 +800,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_identity"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=arpad%2Fblob_batch#e34e9d277391a394dce6bd016db8d50991462f72"
|
||||
dependencies = [
|
||||
"async-lock",
|
||||
"async-trait",
|
||||
@@ -819,7 +819,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_storage"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=arpad%2Fblob_batch#e34e9d277391a394dce6bd016db8d50991462f72"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"async-lock",
|
||||
@@ -837,7 +837,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_storage_blobs"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=arpad%2Fblob_batch#e34e9d277391a394dce6bd016db8d50991462f72"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"azure_core",
|
||||
@@ -857,7 +857,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_svc_blobstorage"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=arpad%2Fblob_batch#e34e9d277391a394dce6bd016db8d50991462f72"
|
||||
dependencies = [
|
||||
"azure_core",
|
||||
"bytes",
|
||||
|
||||
@@ -213,10 +213,10 @@ postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", br
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
|
||||
## Azure SDK crates
|
||||
azure_core = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
|
||||
azure_identity = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_core = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
|
||||
azure_identity = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
|
||||
@@ -556,7 +556,7 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
let op = async {
|
||||
// TODO batch requests are not supported by the SDK
|
||||
// https://github.com/Azure/azure-sdk-for-rust/issues/1068
|
||||
for path in paths {
|
||||
for path_chunk in paths.chunks(256) {
|
||||
#[derive(Debug)]
|
||||
enum AzureOrTimeout {
|
||||
AzureError(azure_core::Error),
|
||||
@@ -572,13 +572,20 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
let max_retries = 5;
|
||||
backoff::retry(
|
||||
|| async {
|
||||
let blob_client = self.client.blob_client(self.relative_path_to_name(path));
|
||||
let mut batch_client = self.client.blob_batch();
|
||||
for path in path_chunk {
|
||||
batch_client = match batch_client.delete(self.relative_path_to_name(path)) {
|
||||
Ok(batch_client) => batch_client,
|
||||
Err(e) => return Err(AzureOrTimeout::AzureError(e)),
|
||||
};
|
||||
}
|
||||
|
||||
let request = blob_client.delete().into_future();
|
||||
let request = batch_client.into_future();
|
||||
|
||||
let res = tokio::time::timeout(self.timeout, request).await;
|
||||
|
||||
match res {
|
||||
// TODO: validate that all deletions were successful
|
||||
Ok(Ok(_v)) => Ok(()),
|
||||
Ok(Err(azure_err)) => {
|
||||
if let Some(http_err) = azure_err.as_http_error() {
|
||||
@@ -624,10 +631,6 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
res
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
super::MAX_KEYS_PER_DELETE_AZURE
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
|
||||
@@ -70,14 +70,7 @@ pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
|
||||
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
|
||||
|
||||
/// As defined in S3 docs
|
||||
///
|
||||
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
|
||||
pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
|
||||
|
||||
/// As defined in Azure docs
|
||||
///
|
||||
/// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
|
||||
pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
|
||||
pub const MAX_KEYS_PER_DELETE: usize = 1000;
|
||||
|
||||
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
|
||||
|
||||
@@ -347,14 +340,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
/// Returns the maximum number of keys that a call to [`Self::delete_objects`] can delete without chunking
|
||||
///
|
||||
/// The value returned is only an optimization hint, One can pass larger number of objects to
|
||||
/// `delete_objects` as well.
|
||||
///
|
||||
/// The value is guaranteed to be >= 1.
|
||||
fn max_keys_per_delete(&self) -> usize;
|
||||
|
||||
/// Deletes all objects matching the given prefix.
|
||||
///
|
||||
/// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
|
||||
@@ -548,16 +533,6 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
|
||||
/// [`RemoteStorage::max_keys_per_delete`]
|
||||
pub fn max_keys_per_delete(&self) -> usize {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.max_keys_per_delete(),
|
||||
Self::AwsS3(s) => s.max_keys_per_delete(),
|
||||
Self::AzureBlob(s) => s.max_keys_per_delete(),
|
||||
Self::Unreliable(s) => s.max_keys_per_delete(),
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteStorage::delete_prefix`]
|
||||
pub async fn delete_prefix(
|
||||
&self,
|
||||
|
||||
@@ -573,10 +573,6 @@ impl RemoteStorage for LocalFs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
super::MAX_KEYS_PER_DELETE_S3
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
|
||||
@@ -48,7 +48,7 @@ use crate::{
|
||||
metrics::{start_counting_cancelled_wait, start_measuring_requests},
|
||||
support::PermitCarrying,
|
||||
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
|
||||
RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE_S3,
|
||||
RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
@@ -355,7 +355,7 @@ impl S3Bucket {
|
||||
let kind = RequestKind::Delete;
|
||||
let mut cancel = std::pin::pin!(cancel.cancelled());
|
||||
|
||||
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
|
||||
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
let req = self
|
||||
@@ -832,10 +832,6 @@ impl RemoteStorage for S3Bucket {
|
||||
self.delete_oids(&permit, &delete_objects, cancel).await
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
MAX_KEYS_PER_DELETE_S3
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
let paths = std::array::from_ref(path);
|
||||
self.delete_objects(paths, cancel).await
|
||||
|
||||
@@ -203,10 +203,6 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
self.inner.max_keys_per_delete()
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::RemotePath;
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
use remote_storage::MAX_KEYS_PER_DELETE;
|
||||
use std::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
@@ -130,8 +131,7 @@ impl Deleter {
|
||||
}
|
||||
|
||||
pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
|
||||
let max_keys_per_delete = self.remote_storage.max_keys_per_delete();
|
||||
self.accumulator.reserve(max_keys_per_delete);
|
||||
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
|
||||
|
||||
loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
@@ -156,14 +156,14 @@ impl Deleter {
|
||||
|
||||
match msg {
|
||||
DeleterMessage::Delete(mut list) => {
|
||||
while !list.is_empty() || self.accumulator.len() == max_keys_per_delete {
|
||||
if self.accumulator.len() == max_keys_per_delete {
|
||||
while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
self.flush().await?;
|
||||
// If we have received this number of keys, proceed with attempting to execute
|
||||
assert_eq!(self.accumulator.len(), 0);
|
||||
}
|
||||
|
||||
let available_slots = max_keys_per_delete - self.accumulator.len();
|
||||
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
|
||||
let take_count = std::cmp::min(available_slots, list.len());
|
||||
for path in list.drain(list.len() - take_count..) {
|
||||
self.accumulator.push(path);
|
||||
|
||||
@@ -4506,12 +4506,7 @@ impl Tenant {
|
||||
// - this timeline was created while we were finding cutoffs
|
||||
// - lsn for timestamp search fails for this timeline repeatedly
|
||||
if let Some(cutoffs) = gc_cutoffs.get(&timeline.timeline_id) {
|
||||
let original_cutoffs = target.cutoffs.clone();
|
||||
// GC cutoffs should never go back
|
||||
target.cutoffs = GcCutoffs {
|
||||
space: Lsn(cutoffs.space.0.max(original_cutoffs.space.0)),
|
||||
time: Lsn(cutoffs.time.0.max(original_cutoffs.time.0)),
|
||||
}
|
||||
target.cutoffs = cutoffs.clone();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@
|
||||
#include "libpq/pqformat.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "portability/instr_time.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/ipc.h"
|
||||
@@ -119,11 +118,6 @@ typedef struct
|
||||
*/
|
||||
PSConnectionState state;
|
||||
PGconn *conn;
|
||||
|
||||
/* request / response counters for debugging */
|
||||
uint64 nrequests_sent;
|
||||
uint64 nresponses_received;
|
||||
|
||||
/*---
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_READABLE on 'conn'
|
||||
@@ -634,8 +628,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
}
|
||||
|
||||
shard->state = PS_Connected;
|
||||
shard->nrequests_sent = 0;
|
||||
shard->nresponses_received = 0;
|
||||
}
|
||||
/* FALLTHROUGH */
|
||||
case PS_Connected:
|
||||
@@ -664,27 +656,6 @@ call_PQgetCopyData(shardno_t shard_no, char **buffer)
|
||||
int ret;
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
PGconn *pageserver_conn = shard->conn;
|
||||
instr_time now,
|
||||
start_ts,
|
||||
since_start,
|
||||
last_log_ts,
|
||||
since_last_log;
|
||||
bool logged = false;
|
||||
|
||||
/*
|
||||
* As a debugging aid, if we don't get a response for a long time, print a
|
||||
* log message.
|
||||
*
|
||||
* 10 s is a very generous threshold, normally we expect a response in a
|
||||
* few milliseconds. We have metrics to track latencies in normal ranges,
|
||||
* but in the cases that take exceptionally long, it's useful to log the
|
||||
* exact timestamps.
|
||||
*/
|
||||
#define LOG_INTERVAL_US UINT64CONST(10 * 1000000)
|
||||
|
||||
INSTR_TIME_SET_CURRENT(now);
|
||||
start_ts = last_log_ts = now;
|
||||
INSTR_TIME_SET_ZERO(since_last_log);
|
||||
|
||||
retry:
|
||||
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
|
||||
@@ -692,12 +663,9 @@ retry:
|
||||
if (ret == 0)
|
||||
{
|
||||
WaitEvent event;
|
||||
long timeout;
|
||||
|
||||
timeout = Min(0, LOG_INTERVAL_US - INSTR_TIME_GET_MICROSEC(since_last_log));
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(shard->wes_read, timeout, &event, 1,
|
||||
(void) WaitEventSetWait(shard->wes_read, -1L, &event, 1,
|
||||
WAIT_EVENT_NEON_PS_READ);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
@@ -716,40 +684,9 @@ retry:
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Print a message to the log if a long time has passed with no
|
||||
* response.
|
||||
*/
|
||||
INSTR_TIME_SET_CURRENT(now);
|
||||
since_last_log = now;
|
||||
INSTR_TIME_SUBTRACT(since_last_log, last_log_ts);
|
||||
if (INSTR_TIME_GET_MICROSEC(since_last_log) >= LOG_INTERVAL_US)
|
||||
{
|
||||
since_start = now;
|
||||
INSTR_TIME_SUBTRACT(since_start, start_ts);
|
||||
neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses)",
|
||||
INSTR_TIME_GET_DOUBLE(since_start),
|
||||
shard->nrequests_sent, shard->nresponses_received);
|
||||
last_log_ts = now;
|
||||
logged = true;
|
||||
}
|
||||
|
||||
goto retry;
|
||||
}
|
||||
|
||||
/*
|
||||
* If we logged earlier that the response is taking a long time, log
|
||||
* another message when the response is finally received.
|
||||
*/
|
||||
if (logged)
|
||||
{
|
||||
INSTR_TIME_SET_CURRENT(now);
|
||||
since_start = now;
|
||||
INSTR_TIME_SUBTRACT(since_start, start_ts);
|
||||
neon_shard_log(shard_no, LOG, "received response from pageserver after %0.3f s",
|
||||
INSTR_TIME_GET_DOUBLE(since_start));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -849,7 +786,6 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
* PGRES_POLLING_WRITING state. It's kinda dirty to disconnect at this
|
||||
* point, but on the grand scheme of things it's only a small issue.
|
||||
*/
|
||||
shard->nrequests_sent++;
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
@@ -942,7 +878,6 @@ pageserver_receive(shardno_t shard_no)
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc);
|
||||
}
|
||||
|
||||
shard->nresponses_received++;
|
||||
return (NeonResponse *) resp;
|
||||
}
|
||||
|
||||
|
||||
@@ -423,11 +423,7 @@ readahead_buffer_resize(int newsize, void *extra)
|
||||
* ensuring we have received all but the last n requests (n = newsize).
|
||||
*/
|
||||
if (MyPState->n_requests_inflight > newsize)
|
||||
{
|
||||
Assert(MyPState->ring_unused >= MyPState->n_requests_inflight - newsize);
|
||||
prefetch_wait_for(MyPState->ring_unused - (MyPState->n_requests_inflight - newsize));
|
||||
Assert(MyPState->n_requests_inflight <= newsize);
|
||||
}
|
||||
prefetch_wait_for(MyPState->ring_unused - newsize);
|
||||
|
||||
/* construct the new PrefetchState, and copy over the memory contexts */
|
||||
newPState = MemoryContextAllocZero(TopMemoryContext, newprfs_size);
|
||||
@@ -442,6 +438,7 @@ readahead_buffer_resize(int newsize, void *extra)
|
||||
newPState->ring_last = newsize;
|
||||
newPState->ring_unused = newsize;
|
||||
newPState->ring_receive = newsize;
|
||||
newPState->ring_flush = newsize;
|
||||
newPState->max_shard_no = MyPState->max_shard_no;
|
||||
memcpy(newPState->shard_bitmap, MyPState->shard_bitmap, sizeof(MyPState->shard_bitmap));
|
||||
|
||||
@@ -492,7 +489,6 @@ readahead_buffer_resize(int newsize, void *extra)
|
||||
}
|
||||
newPState->n_unused -= 1;
|
||||
}
|
||||
newPState->ring_flush = newPState->ring_receive;
|
||||
|
||||
MyNeonCounters->getpage_prefetches_buffered =
|
||||
MyPState->n_responses_buffered;
|
||||
@@ -502,7 +498,6 @@ readahead_buffer_resize(int newsize, void *extra)
|
||||
for (; end >= MyPState->ring_last && end != UINT64_MAX; end -= 1)
|
||||
{
|
||||
PrefetchRequest *slot = GetPrfSlot(end);
|
||||
Assert(slot->status != PRFS_REQUESTED);
|
||||
if (slot->status == PRFS_RECEIVED)
|
||||
{
|
||||
pfree(slot->response);
|
||||
|
||||
@@ -115,8 +115,7 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()),
|
||||
};
|
||||
if !self.limiter.lock().unwrap().check(subnet_key, 1) {
|
||||
// log only the subnet part of the IP address to know which subnet is rate limited
|
||||
tracing::warn!("Rate limit exceeded. Skipping cancellation message, {subnet_key}");
|
||||
tracing::debug!("Rate limit exceeded. Skipping cancellation message");
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.cancellation_requests_total
|
||||
|
||||
@@ -163,36 +163,32 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
let do_handshake = handshake(ctx, stream, tls, record_handshake_error);
|
||||
|
||||
let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake)
|
||||
.await??
|
||||
{
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?session_id);
|
||||
cancel_span.follows_from(tracing::Span::current());
|
||||
async move {
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.instrument(cancel_span)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
let (mut stream, params) =
|
||||
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
async move {
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
drop(pause);
|
||||
|
||||
ctx.set_db_options(params.clone());
|
||||
|
||||
@@ -272,36 +272,32 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
let do_handshake = handshake(ctx, stream, mode.handshake_tls(tls), record_handshake_error);
|
||||
|
||||
let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake)
|
||||
.await??
|
||||
{
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?session_id);
|
||||
cancel_span.follows_from(tracing::Span::current());
|
||||
async move {
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.instrument(cancel_span)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
let (mut stream, params) =
|
||||
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
async move {
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
drop(pause);
|
||||
|
||||
ctx.set_db_options(params.clone());
|
||||
|
||||
@@ -13,7 +13,6 @@ use crate::cache::project_info::ProjectInfoCache;
|
||||
use crate::cancellation::{CancelMap, CancellationHandler};
|
||||
use crate::intern::{ProjectIdInt, RoleNameInt};
|
||||
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
|
||||
use tracing::Instrument;
|
||||
|
||||
const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates";
|
||||
pub(crate) const PROXY_CHANNEL_NAME: &str = "neondb-proxy-to-proxy-updates";
|
||||
@@ -144,8 +143,6 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
let peer_addr = cancel_session
|
||||
.peer_addr
|
||||
.unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
|
||||
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?cancel_session.session_id);
|
||||
cancel_span.follows_from(tracing::Span::current());
|
||||
// This instance of cancellation_handler doesn't have a RedisPublisherClient so it can't publish the message.
|
||||
match self
|
||||
.cancellation_handler
|
||||
@@ -155,7 +152,6 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
peer_addr,
|
||||
cancel_session.peer_addr.is_some(),
|
||||
)
|
||||
.instrument(cancel_span)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
|
||||
@@ -459,10 +459,12 @@ pub async fn get_timeline_objects(
|
||||
Ok(list.keys)
|
||||
}
|
||||
|
||||
const MAX_KEYS_PER_DELETE: usize = 1000;
|
||||
|
||||
/// Drain a buffer of keys into DeleteObjects requests
|
||||
///
|
||||
/// If `drain` is true, drains keys completely; otherwise stops when <
|
||||
/// `max_keys_per_delete`` keys are left.
|
||||
/// MAX_KEYS_PER_DELETE keys are left.
|
||||
/// `num_deleted` returns number of deleted keys.
|
||||
async fn do_delete(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
@@ -472,10 +474,9 @@ async fn do_delete(
|
||||
progress_tracker: &mut DeletionProgressTracker,
|
||||
) -> anyhow::Result<()> {
|
||||
let cancel = CancellationToken::new();
|
||||
let max_keys_per_delete = remote_client.max_keys_per_delete();
|
||||
while (!keys.is_empty() && drain) || (keys.len() >= max_keys_per_delete) {
|
||||
while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
|
||||
let request_keys =
|
||||
keys.split_off(keys.len() - (std::cmp::min(max_keys_per_delete, keys.len())));
|
||||
keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
|
||||
|
||||
let request_keys: Vec<RemotePath> = request_keys.into_iter().map(|o| o.key).collect();
|
||||
|
||||
@@ -616,7 +617,7 @@ pub async fn purge_garbage(
|
||||
}
|
||||
|
||||
objects_to_delete.append(&mut object_list);
|
||||
if objects_to_delete.len() >= remote_client.max_keys_per_delete() {
|
||||
if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
|
||||
do_delete(
|
||||
&remote_client,
|
||||
&mut objects_to_delete,
|
||||
|
||||
@@ -86,8 +86,6 @@ enum Command {
|
||||
/// For safekeeper node_kind only, json list of timelines and their lsn info
|
||||
#[arg(long, default_value = None)]
|
||||
timeline_lsns: Option<String>,
|
||||
#[arg(long, default_value_t = false)]
|
||||
verbose: bool,
|
||||
},
|
||||
TenantSnapshot {
|
||||
#[arg(long = "tenant-id")]
|
||||
@@ -168,7 +166,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
dump_db_connstr,
|
||||
dump_db_table,
|
||||
timeline_lsns,
|
||||
verbose,
|
||||
} => {
|
||||
if let NodeKind::Safekeeper = node_kind {
|
||||
let db_or_list = match (timeline_lsns, dump_db_connstr) {
|
||||
@@ -206,7 +203,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
tenant_ids,
|
||||
json,
|
||||
post_to_storcon,
|
||||
verbose,
|
||||
cli.exit_code,
|
||||
)
|
||||
.await
|
||||
@@ -317,7 +313,6 @@ pub async fn run_cron_job(
|
||||
Vec::new(),
|
||||
true,
|
||||
post_to_storcon,
|
||||
false, // default to non-verbose mode
|
||||
exit_code,
|
||||
)
|
||||
.await?;
|
||||
@@ -367,13 +362,12 @@ pub async fn scan_pageserver_metadata_cmd(
|
||||
tenant_shard_ids: Vec<TenantShardId>,
|
||||
json: bool,
|
||||
post_to_storcon: bool,
|
||||
verbose: bool,
|
||||
exit_code: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
if controller_client.is_none() && post_to_storcon {
|
||||
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
|
||||
}
|
||||
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids, verbose).await {
|
||||
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids).await {
|
||||
Err(e) => {
|
||||
tracing::error!("Failed: {e}");
|
||||
Err(e)
|
||||
|
||||
@@ -21,12 +21,8 @@ pub struct MetadataSummary {
|
||||
tenant_count: usize,
|
||||
timeline_count: usize,
|
||||
timeline_shard_count: usize,
|
||||
/// Tenant-shard timeline (key) mapping to errors. The key has to be a string because it will be serialized to a JSON.
|
||||
/// The key is generated using `TenantShardTimelineId::to_string()`.
|
||||
with_errors: HashMap<String, Vec<String>>,
|
||||
/// Tenant-shard timeline (key) mapping to warnings. The key has to be a string because it will be serialized to a JSON.
|
||||
/// The key is generated using `TenantShardTimelineId::to_string()`.
|
||||
with_warnings: HashMap<String, Vec<String>>,
|
||||
with_errors: HashSet<TenantShardTimelineId>,
|
||||
with_warnings: HashSet<TenantShardTimelineId>,
|
||||
with_orphans: HashSet<TenantShardTimelineId>,
|
||||
indices_by_version: HashMap<usize, usize>,
|
||||
|
||||
@@ -56,12 +52,7 @@ impl MetadataSummary {
|
||||
}
|
||||
}
|
||||
|
||||
fn update_analysis(
|
||||
&mut self,
|
||||
id: &TenantShardTimelineId,
|
||||
analysis: &TimelineAnalysis,
|
||||
verbose: bool,
|
||||
) {
|
||||
fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
|
||||
if analysis.is_healthy() {
|
||||
self.healthy_tenant_shards.insert(id.tenant_shard_id);
|
||||
} else {
|
||||
@@ -70,17 +61,11 @@ impl MetadataSummary {
|
||||
}
|
||||
|
||||
if !analysis.errors.is_empty() {
|
||||
let entry = self.with_errors.entry(id.to_string()).or_default();
|
||||
if verbose {
|
||||
entry.extend(analysis.errors.iter().cloned());
|
||||
}
|
||||
self.with_errors.insert(*id);
|
||||
}
|
||||
|
||||
if !analysis.warnings.is_empty() {
|
||||
let entry = self.with_warnings.entry(id.to_string()).or_default();
|
||||
if verbose {
|
||||
entry.extend(analysis.warnings.iter().cloned());
|
||||
}
|
||||
self.with_warnings.insert(*id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,7 +120,6 @@ Index versions: {version_summary}
|
||||
pub async fn scan_pageserver_metadata(
|
||||
bucket_config: BucketConfig,
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
verbose: bool,
|
||||
) -> anyhow::Result<MetadataSummary> {
|
||||
let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
|
||||
|
||||
@@ -180,7 +164,6 @@ pub async fn scan_pageserver_metadata(
|
||||
mut tenant_objects: TenantObjectListing,
|
||||
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
|
||||
highest_shard_count: ShardCount,
|
||||
verbose: bool,
|
||||
) {
|
||||
summary.tenant_count += 1;
|
||||
|
||||
@@ -220,7 +203,7 @@ pub async fn scan_pageserver_metadata(
|
||||
Some(data),
|
||||
)
|
||||
.await;
|
||||
summary.update_analysis(&ttid, &analysis, verbose);
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
|
||||
timeline_ids.insert(ttid.timeline_id);
|
||||
} else {
|
||||
@@ -288,6 +271,10 @@ pub async fn scan_pageserver_metadata(
|
||||
summary.update_data(&data);
|
||||
|
||||
match tenant_id {
|
||||
None => {
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
|
||||
}
|
||||
Some(prev_tenant_id) => {
|
||||
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
|
||||
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
|
||||
@@ -300,7 +287,6 @@ pub async fn scan_pageserver_metadata(
|
||||
tenant_objects,
|
||||
timelines,
|
||||
highest_shard_count,
|
||||
verbose,
|
||||
)
|
||||
.instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
|
||||
.await;
|
||||
@@ -310,10 +296,6 @@ pub async fn scan_pageserver_metadata(
|
||||
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
|
||||
}
|
||||
}
|
||||
|
||||
match &data.blob_data {
|
||||
@@ -344,7 +326,6 @@ pub async fn scan_pageserver_metadata(
|
||||
tenant_objects,
|
||||
tenant_timeline_results,
|
||||
highest_shard_count,
|
||||
verbose,
|
||||
)
|
||||
.instrument(info_span!("analyze-tenant", tenant = %tenant_id))
|
||||
.await;
|
||||
|
||||
@@ -4556,7 +4556,6 @@ class StorageScrubber:
|
||||
def __init__(self, env: NeonEnv, log_dir: Path):
|
||||
self.env = env
|
||||
self.log_dir = log_dir
|
||||
self.allowed_errors: list[str] = []
|
||||
|
||||
def scrubber_cli(
|
||||
self, args: list[str], timeout, extra_env: dict[str, str] | None = None
|
||||
@@ -4634,70 +4633,19 @@ class StorageScrubber:
|
||||
if timeline_lsns is not None:
|
||||
args.append("--timeline-lsns")
|
||||
args.append(json.dumps(timeline_lsns))
|
||||
if node_kind == NodeKind.PAGESERVER:
|
||||
args.append("--verbose")
|
||||
stdout = self.scrubber_cli(args, timeout=30, extra_env=extra_env)
|
||||
|
||||
try:
|
||||
summary = json.loads(stdout)
|
||||
healthy = self._check_run_healthy(summary)
|
||||
# summary does not contain "with_warnings" if node_kind is the safekeeper
|
||||
no_warnings = "with_warnings" not in summary or not summary["with_warnings"]
|
||||
healthy = not summary["with_errors"] and no_warnings
|
||||
return healthy, summary
|
||||
except:
|
||||
log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:")
|
||||
log.error(stdout)
|
||||
raise
|
||||
|
||||
def _check_line_allowed(self, line: str) -> bool:
|
||||
for a in self.allowed_errors:
|
||||
try:
|
||||
if re.match(a, line):
|
||||
return True
|
||||
except re.error:
|
||||
log.error(f"Invalid regex: '{a}'")
|
||||
raise
|
||||
return False
|
||||
|
||||
def _check_line_list_allowed(self, lines: list[str]) -> bool:
|
||||
for line in lines:
|
||||
if not self._check_line_allowed(line):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _check_run_healthy(self, summary: dict[str, Any]) -> bool:
|
||||
# summary does not contain "with_warnings" if node_kind is the safekeeper
|
||||
healthy = True
|
||||
with_warnings = summary.get("with_warnings", None)
|
||||
if with_warnings is not None:
|
||||
if isinstance(with_warnings, list):
|
||||
if len(with_warnings) > 0:
|
||||
# safekeeper scan_metadata output is a list of tenants
|
||||
healthy = False
|
||||
else:
|
||||
for _, warnings in with_warnings.items():
|
||||
assert (
|
||||
len(warnings) > 0
|
||||
), "with_warnings value should not be empty, running without verbose mode?"
|
||||
if not self._check_line_list_allowed(warnings):
|
||||
healthy = False
|
||||
break
|
||||
if not healthy:
|
||||
return healthy
|
||||
with_errors = summary.get("with_errors", None)
|
||||
if with_errors is not None:
|
||||
if isinstance(with_errors, list):
|
||||
if len(with_errors) > 0:
|
||||
# safekeeper scan_metadata output is a list of tenants
|
||||
healthy = False
|
||||
else:
|
||||
for _, errors in with_errors.items():
|
||||
assert (
|
||||
len(errors) > 0
|
||||
), "with_errors value should not be empty, running without verbose mode?"
|
||||
if not self._check_line_list_allowed(errors):
|
||||
healthy = False
|
||||
break
|
||||
return healthy
|
||||
|
||||
def tenant_snapshot(self, tenant_id: TenantId, output_path: Path):
|
||||
stdout = self.scrubber_cli(
|
||||
["tenant-snapshot", "--tenant-id", str(tenant_id), "--output-path", str(output_path)],
|
||||
|
||||
@@ -1,124 +0,0 @@
|
||||
import threading
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
BTREE_NUM_CYCLEID_PAGES = """
|
||||
WITH raw_pages AS (
|
||||
SELECT blkno, get_raw_page_at_lsn('t_uidx', 'main', blkno, NULL, NULL) page
|
||||
FROM generate_series(1, pg_relation_size('t_uidx'::regclass) / 8192) blkno
|
||||
),
|
||||
parsed_pages AS (
|
||||
/* cycle ID is the last 2 bytes of the btree page */
|
||||
SELECT blkno, SUBSTRING(page FROM 8191 FOR 2) as cycle_id
|
||||
FROM raw_pages
|
||||
)
|
||||
SELECT count(*),
|
||||
encode(cycle_id, 'hex')
|
||||
FROM parsed_pages
|
||||
WHERE encode(cycle_id, 'hex') != '0000'
|
||||
GROUP BY encode(cycle_id, 'hex');
|
||||
"""
|
||||
|
||||
|
||||
def test_nbtree_pagesplit_cycleid(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
ses1 = endpoint.connect().cursor()
|
||||
ses1.execute("ALTER SYSTEM SET autovacuum = off;")
|
||||
ses1.execute("ALTER SYSTEM SET enable_seqscan = off;")
|
||||
ses1.execute("ALTER SYSTEM SET full_page_writes = off;")
|
||||
ses1.execute("SELECT pg_reload_conf();")
|
||||
ses1.execute("CREATE EXTENSION neon_test_utils;")
|
||||
# prepare a large index
|
||||
ses1.execute("CREATE TABLE t(id integer GENERATED ALWAYS AS IDENTITY, txt text);")
|
||||
ses1.execute("CREATE UNIQUE INDEX t_uidx ON t(id);")
|
||||
ses1.execute("INSERT INTO t (txt) SELECT i::text FROM generate_series(1, 2035) i;")
|
||||
|
||||
ses1.execute("SELECT neon_xlogflush();")
|
||||
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
|
||||
pages = ses1.fetchall()
|
||||
assert (
|
||||
len(pages) == 0
|
||||
), f"0 back splits with cycle ID expected, real {len(pages)} first {pages[0]}"
|
||||
# Delete enough tuples to clear the first index page.
|
||||
# (there are up to 407 rows per 8KiB page; 406 for non-rightmost leafs.
|
||||
ses1.execute("DELETE FROM t WHERE id <= 406;")
|
||||
# Make sure the page is cleaned up
|
||||
ses1.execute("VACUUM (FREEZE, INDEX_CLEANUP ON) t;")
|
||||
|
||||
# Do another delete-then-indexcleanup cycle, to move the pages from
|
||||
# "dead" to "reusable"
|
||||
ses1.execute("DELETE FROM t WHERE id <= 446;")
|
||||
ses1.execute("VACUUM (FREEZE, INDEX_CLEANUP ON) t;")
|
||||
|
||||
# Make sure the vacuum we're about to trigger in s3 has cleanup work to do
|
||||
ses1.execute("DELETE FROM t WHERE id <= 610;")
|
||||
|
||||
# Flush wal, for checking purposes
|
||||
ses1.execute("SELECT neon_xlogflush();")
|
||||
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
|
||||
pages = ses1.fetchall()
|
||||
assert len(pages) == 0, f"No back splits with cycle ID expected, got batches of {pages} instead"
|
||||
|
||||
ses2 = endpoint.connect().cursor()
|
||||
ses3 = endpoint.connect().cursor()
|
||||
|
||||
# Session 2 pins a btree page, which prevents vacuum from processing that
|
||||
# page, thus allowing us to reliably split pages while a concurrent vacuum
|
||||
# is running.
|
||||
ses2.execute("BEGIN;")
|
||||
ses2.execute(
|
||||
"DECLARE foo NO SCROLL CURSOR FOR SELECT row_number() over () FROM t ORDER BY id ASC"
|
||||
)
|
||||
ses2.execute("FETCH FROM foo;") # pins the leaf page with id 611
|
||||
wait_evt = threading.Event()
|
||||
|
||||
# Session 3 runs the VACUUM command. Note that this will block, and
|
||||
# therefore must run on another thread.
|
||||
# We rely on this running quickly enough to hit the pinned page from
|
||||
# session 2 by the time we start other work again in session 1, but
|
||||
# technically there is a race where the thread (and/or PostgreSQL process)
|
||||
# don't get to that pinned page with vacuum until >2s after evt.set() was
|
||||
# called, and session 1 thus might already have split pages.
|
||||
def vacuum_freeze_t(ses3, evt: threading.Event):
|
||||
# Begin parallel vacuum that should hit the index
|
||||
evt.set()
|
||||
# this'll hang until s2 fetches enough new data from its cursor.
|
||||
# this is technically a race with the time.sleep(2) below, but if this
|
||||
# command doesn't hit
|
||||
ses3.execute("VACUUM (FREEZE, INDEX_CLEANUP on, DISABLE_PAGE_SKIPPING on) t;")
|
||||
|
||||
ses3t = threading.Thread(target=vacuum_freeze_t, args=(ses3, wait_evt))
|
||||
ses3t.start()
|
||||
wait_evt.wait()
|
||||
# Make extra sure we got the thread started and vacuum is stuck, by waiting
|
||||
# some time even after wait_evt got set. This isn't truly reliable (it is
|
||||
# possible
|
||||
time.sleep(2)
|
||||
|
||||
# Insert 2 pages worth of new data.
|
||||
# This should reuse the one empty page, plus another page at the end of
|
||||
# the index relation; with split ordering
|
||||
# old_blk -> blkno=1 -> old_blk + 1.
|
||||
# As this is run while vacuum in session 3 is happening, these splits
|
||||
# should receive cycle IDs where applicable.
|
||||
ses1.execute("INSERT INTO t (txt) SELECT i::text FROM generate_series(1, 812) i;")
|
||||
# unpin the btree page, allowing s3's vacuum to complete
|
||||
ses2.execute("FETCH ALL FROM foo;")
|
||||
ses2.execute("ROLLBACK;")
|
||||
# flush WAL to make sure PS is up-to-date
|
||||
ses1.execute("SELECT neon_xlogflush();")
|
||||
# check that our expectations are correct
|
||||
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
|
||||
pages = ses1.fetchall()
|
||||
assert (
|
||||
len(pages) == 1 and pages[0][0] == 3
|
||||
), f"3 page splits with cycle ID expected; actual {pages}"
|
||||
|
||||
# final cleanup
|
||||
ses3t.join()
|
||||
ses1.close()
|
||||
ses2.close()
|
||||
ses3.close()
|
||||
@@ -572,10 +572,4 @@ def test_scrubber_scan_pageserver_metadata(
|
||||
unhealthy = env.storage_controller.metadata_health_list_unhealthy()["unhealthy_tenant_shards"]
|
||||
assert len(unhealthy) == 1 and unhealthy[0] == str(tenant_shard_id)
|
||||
|
||||
healthy, _ = env.storage_scrubber.scan_metadata()
|
||||
assert not healthy
|
||||
env.storage_scrubber.allowed_errors.append(".*not present in remote storage.*")
|
||||
healthy, _ = env.storage_scrubber.scan_metadata()
|
||||
assert healthy
|
||||
|
||||
neon_env_builder.disable_scrub_on_exit() # We already ran scrubber, no need to do an extra run
|
||||
neon_env_builder.disable_scrub_on_exit()
|
||||
|
||||
@@ -4,7 +4,7 @@ import time
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, fork_at_current_lsn
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -292,76 +292,3 @@ def test_vm_bit_clear_on_heap_lock_blackbox(neon_env_builder: NeonEnvBuilder):
|
||||
tup = cur.fetchall()
|
||||
log.info(f"tuple = {tup}")
|
||||
cur.execute("commit transaction")
|
||||
|
||||
|
||||
def test_check_visibility_map(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
"""
|
||||
Runs pgbench across a few databases on a sharded tenant, then performs a visibility map
|
||||
consistency check. Regression test for https://github.com/neondatabase/neon/issues/9914.
|
||||
"""
|
||||
|
||||
# Use a large number of shards with small stripe sizes, to ensure the visibility
|
||||
# map will end up on non-zero shards.
|
||||
SHARD_COUNT = 8
|
||||
STRIPE_SIZE = 32 # in 8KB pages
|
||||
PGBENCH_RUNS = 4
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=SHARD_COUNT, initial_tenant_shard_stripe_size=STRIPE_SIZE
|
||||
)
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"shared_buffers = 64MB",
|
||||
],
|
||||
)
|
||||
|
||||
# Run pgbench in 4 different databases, to exercise different shards.
|
||||
dbnames = [f"pgbench{i}" for i in range(PGBENCH_RUNS)]
|
||||
for i, dbname in enumerate(dbnames):
|
||||
log.info(f"pgbench run {i+1}/{PGBENCH_RUNS}")
|
||||
endpoint.safe_psql(f"create database {dbname}")
|
||||
connstr = endpoint.connstr(dbname=dbname)
|
||||
# pgbench -i will automatically vacuum the tables. This creates the visibility map.
|
||||
pg_bin.run(["pgbench", "-i", "-s", "10", connstr])
|
||||
# Freeze the tuples to set the initial frozen bit.
|
||||
endpoint.safe_psql("vacuum freeze", dbname=dbname)
|
||||
# Run pgbench.
|
||||
pg_bin.run(["pgbench", "-c", "32", "-j", "8", "-T", "10", connstr])
|
||||
|
||||
# Restart the endpoint to flush the compute page cache. We want to make sure we read VM pages
|
||||
# from storage, not cache.
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
# Check that the visibility map matches the heap contents for pg_accounts (the main table).
|
||||
for dbname in dbnames:
|
||||
log.info(f"Checking visibility map for {dbname}")
|
||||
with endpoint.cursor(dbname=dbname) as cur:
|
||||
cur.execute("create extension pg_visibility")
|
||||
|
||||
cur.execute("select count(*) from pg_check_visible('pgbench_accounts')")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 0, f"{row[0]} inconsistent VM pages (visible)"
|
||||
|
||||
cur.execute("select count(*) from pg_check_frozen('pgbench_accounts')")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 0, f"{row[0]} inconsistent VM pages (frozen)"
|
||||
|
||||
# Vacuum and freeze the tables, and check that the visibility map is still accurate.
|
||||
for dbname in dbnames:
|
||||
log.info(f"Vacuuming and checking visibility map for {dbname}")
|
||||
with endpoint.cursor(dbname=dbname) as cur:
|
||||
cur.execute("vacuum freeze")
|
||||
|
||||
cur.execute("select count(*) from pg_check_visible('pgbench_accounts')")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 0, f"{row[0]} inconsistent VM pages (visible)"
|
||||
|
||||
cur.execute("select count(*) from pg_check_frozen('pgbench_accounts')")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 0, f"{row[0]} inconsistent VM pages (frozen)"
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 13ff324150...373f9decad
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 8736b10c1d...972e325e62
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 81428621f7...dff6615a8e
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 01fa3c4866...a10d95be67
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.2",
|
||||
"01fa3c48664ca030cfb69bb4a350aa9df4691d88"
|
||||
"a10d95be67265e0f10a422ba0457f5a7af01de71"
|
||||
],
|
||||
"v16": [
|
||||
"16.6",
|
||||
"81428621f7c04aed03671cf80a928e0a36d92505"
|
||||
"dff6615a8e48a10bb17a03fa3c00635f1ace7a92"
|
||||
],
|
||||
"v15": [
|
||||
"15.10",
|
||||
"8736b10c1d93d11b9c0489872dd529c4c0f5338f"
|
||||
"972e325e62b455957adbbdd8580e31275bb5b8c9"
|
||||
],
|
||||
"v14": [
|
||||
"14.15",
|
||||
"13ff324150fceaac72920e01742addc053db9462"
|
||||
"373f9decad933d2d46f321231032ae8b0da81acd"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user