Compare commits

..

17 Commits

Author SHA1 Message Date
Christian Schwarz
8522f429eb port forward dd722fdaf6 (flush recording arcs outside SmgrOpTimerInner) 2025-02-07 11:49:10 +01:00
Christian Schwarz
6e1cbce715 Revert "experiment: clone the metrics but don't measure or update the metrics"
This reverts commit 949fbc15a0.
2025-02-07 11:28:14 +01:00
Christian Schwarz
949fbc15a0 experiment: clone the metrics but don't measure or update the metrics 2025-02-07 11:13:59 +01:00
Christian Schwarz
98dd19ef53 Revert "what if we Box the SmgrOpTimerInner"
This reverts commit a0e8b1617b.
2025-02-07 11:08:47 +01:00
Christian Schwarz
a0e8b1617b what if we Box the SmgrOpTimerInner 2025-02-07 11:02:30 +01:00
Christian Schwarz
1686d9e733 perf(page_service): dont .instrument(span.clone()) the response flush (#10686)
On my AX102 Hetzner box, removing this line removes about 20us from the
`latency_mean` result in

`test_pageserver_characterize_latencies_with_1_client_and_throughput_with_many_clients_one_tenant`.

If the same 20us can be removed in the nightly benchmark run, this will
be a ~10% improvement because there, mean latencies are about ~220us.

This span was added during batching refactors, we didn't have it before,
and I don't think it's terribly useful.

refs
- https://github.com/neondatabase/cloud/issues/21759
2025-02-06 08:33:37 +00:00
Erik Grinaker
abcd00181c pageserver: set a concurrency limit for LocalFS (#10676)
## Problem

The local filesystem backend for remote storage doesn't set a
concurrency limit. While it can't/won't enforce a concurrency limit
itself, this also bounds the upload queue concurrency. Some tests create
thousands of uploads, which slows down the quadratic scheduling of the
upload queue, and there is no point spawning that many Tokio tasks.

Resolves #10409.

## Summary of changes

Set a concurrency limit of 100 for the LocalFS backend.

Before: `test_layer_map[release-pg17].test_query: 68.338 s`
After: `test_layer_map[release-pg17].test_query: 5.209 s`
2025-02-06 07:24:36 +00:00
Konstantin Knizhnik
01f0be03b5 Fix bugs in lfc_cache_containsv (#10682)
## Problem

Incorrect manipulations with iteration index in `lfc_cache_containsv`

## Summary of changes

```
-		int		this_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
+		int		this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);		int		this_chunk = ```
 -		if (i + 1 >= nblocks)
+		if (i >= nblocks)
```

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-02-06 07:00:00 +00:00
Konstantin Knizhnik
81cd30e4d6 Use #ifdef instead of #if USE_ASSERT_CHECKING (#10683)
## Problem

USE_ASSERT _CHECKING is defined as empty entity. but it is checked using
#if

## Summary of changes

Replace `#if USE_ASSERT _CHECKING` with `#ifdef USE_ASSERT _CHECKING` as
done in other places in Postgres

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-02-06 05:47:56 +00:00
Konstantin Knizhnik
7fc6953da4 Is neon superuser (#10625)
## Problem

is_neon_superuser() fiunction is public in pg14/pg15
but statically defined in publicationcmd.c in pg16/pg17

## Summary of changes

Make this function public for all Postgres version.
It is intended to be used not only in  publicationcmd.c

See
https://github.com/neondatabase/postgres/pull/573
https://github.com/neondatabase/postgres/pull/576

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-02-06 05:42:14 +00:00
Christian Schwarz
77f9e74d86 pgxn: include socket send & recv queue size in slow response logs (#10673)
# Problem

When we see an apparent slow request, one possible cause is that the
client is failing to consume responses, but we don't have a clear way to
see that.

# Solution

- Log the socket queue depths on slow/stuck connections, so that we have
an indication of whether the compute is keeping up with processing the
connection's responses.

refs
- slack https://neondb.slack.com/archives/C036U0GRMRB/p1738652644396329
- refs https://github.com/neondatabase/cloud/issues/23515
- refs https://github.com/neondatabase/cloud/issues/23486
2025-02-06 01:14:29 +00:00
Alex Chi Z.
0ceeec9be3 fix(pageserver): schedule compaction immediately if pending (#10684)
## Problem

The code is intended to reschedule compaction immediately if there are
pending tasks. We set the duration to 0 before if there are pending
tasks, but this will go through the `if period == Duration::ZERO {`
branch and sleep for another 10 seconds.

## Summary of changes

Set duration to 1 so that it doesn't sleep for too long.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-02-05 22:11:50 +00:00
Alex Chi Z.
733a57247b fix(pageserver): disallow gc-compaction produce l0 layer (#10679)
## Problem

Any compaction should never produce l0 layers. This never happened in my
experiments, but would be good to guard it early.

## Summary of changes

Disallow gc-compaction to produce l0 layers.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-02-05 20:44:28 +00:00
Heikki Linnakangas
6699a30a49 Make it easy to build only a subset of extensions into compute image (#10655)
The full build of all extensions takes a long time. When working locally
on parts that don't need extensions, you can iterate more quickly by
skipping the unnecessary extensions.

This adds a build argument to the dockerfile to specify extensions to
build. There are three options:

- EXTENSIONS=all (default)
- EXTENSIONS=minimal: Build only a few extensions that are listed in
shared_preload_libraries in the default neon config.
- EXTENSIONS=none: Build no extensions (except for the mandatory 'neon'
extension).
2025-02-05 18:07:51 +00:00
Alex Chi Z.
133b89a83d feat(pageserver): continue from last incomplete image layer creation (#10660)
## Problem

close https://github.com/neondatabase/neon/issues/10651

## Summary of changes

* Image layer creation starts from the next partition of the last
processed partition if the previous attempt was not complete.
* Add tests.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-02-05 17:35:39 +00:00
Arseny Sher
fba22a7123 Record more timings in test_layer_map (#10670)
## Problem

It it is not very clear how much time take different operations.

## Summary of changes

Record more timings.

ref https://github.com/neondatabase/neon/issues/10409
2025-02-05 17:00:26 +00:00
John Spray
14e05276a3 storcon: fix a case where optimise could get stuck on unschedulable node (#10648)
## Problem

When a shard has two secondary locations, but one of them is on a node
with MaySchedule::No, the optimiser would get stuck, because it couldn't
decide which secondary to remove.

This is generally okay if a node is offline, but if a node is in Pause
mode for a long period of time, it's a problem.

Closes: https://github.com/neondatabase/neon/issues/10646

## Summary of changes

- Instead of insisting on finding a node in the wrong AZ to remove, find
an available node in the _right_ AZ, and remove all the others. This
ensures that if there is one live suitable node, then other
offline/paused nodes cannot hold things up.
2025-02-05 16:05:12 +00:00
18 changed files with 466 additions and 164 deletions

View File

@@ -85,6 +85,10 @@ ARG DEBIAN_VERSION=bookworm
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
ARG ALPINE_CURL_VERSION=8.11.1
# By default, build all PostgreSQL extensions. For quick local testing when you don't
# care about the extensions, pass EXTENSIONS=none or EXTENSIONS=minimal
ARG EXTENSIONS=all
#########################################################################################
#
# Layer "build-deps"
@@ -1484,12 +1488,35 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
#########################################################################################
#
# Layer "all-extensions"
# Layer "extensions-none"
#
#########################################################################################
FROM build-deps AS extensions-none
RUN mkdir /usr/local/pgsql
#########################################################################################
#
# Layer "extensions-minimal"
#
# This subset of extensions includes the extensions that we have in
# shared_preload_libraries by default.
#
#########################################################################################
FROM build-deps AS extensions-minimal
COPY --from=pgrag-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=timescaledb-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_cron-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
#########################################################################################
#
# Layer "extensions-all"
# Bundle together all the extensions
#
#########################################################################################
FROM build-deps AS all-extensions
ARG PG_VERSION
FROM build-deps AS extensions-all
# Public extensions
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
@@ -1531,7 +1558,13 @@ COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=neon-ext-build /usr/local/pgsql/ /usr/local/pgsql/
#########################################################################################
#
# Layer "neon-pg-ext-build"
# Includes Postgres and all the extensions chosen by EXTENSIONS arg.
#
#########################################################################################
FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
#########################################################################################
#
@@ -1614,7 +1647,8 @@ RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 30
#
#########################################################################################
FROM neon-ext-build AS postgres-cleanup-layer
COPY --from=all-extensions /usr/local/pgsql /usr/local/pgsql
COPY --from=neon-pg-ext-build /usr/local/pgsql /usr/local/pgsql
# Remove binaries from /bin/ that we won't use (or would manually copy & install otherwise)
RUN cd /usr/local/pgsql/bin && rm -f ecpg raster2pgsql shp2pgsql pgtopo_export pgtopo_import pgsql2shp

View File

@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use crate::{
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
/// External backup storage configuration, enough for creating a client for that storage.
@@ -45,11 +45,11 @@ impl RemoteStorageKind {
impl RemoteStorageConfig {
/// Helper to fetch the configured concurrency limit.
pub fn concurrency_limit(&self) -> Option<usize> {
pub fn concurrency_limit(&self) -> usize {
match &self.storage {
RemoteStorageKind::LocalFs { .. } => None,
RemoteStorageKind::AwsS3(c) => Some(c.concurrency_limit.into()),
RemoteStorageKind::AzureContainer(c) => Some(c.concurrency_limit.into()),
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
}
}
}

View File

@@ -65,6 +65,12 @@ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
/// Here, a limit of max 20k concurrent connections was noted.
/// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
/// Set this limit analogously to the S3 limit.
///
/// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
/// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
/// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
/// No limits on the client side, which currenltly means 1000 for AWS S3.
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;

View File

@@ -1253,9 +1253,6 @@ pub(crate) struct SmgrOpTimerInner {
global_batch_wait_time: Histogram,
per_timeline_batch_wait_time: Histogram,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
throttling: Arc<tenant_throttling::Pagestream>,
timings: SmgrOpTimerState,
@@ -1366,20 +1363,18 @@ impl SmgrOpTimer {
/// The first callers receives Some, subsequent ones None.
///
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_execution_end_flush_start(
&mut self,
at: Instant,
) -> Option<SmgrOpFlushInProgress> {
pub(crate) fn observe_execution_end_flush_start(&mut self, at: Instant) {
// NB: unlike the other observe_* methods, this one take()s.
#[allow(clippy::question_mark)] // maintain similar code pattern.
let Some(mut inner) = self.0.take() else {
return None;
// NB: this take() isn't needed anymore, maybe we can simplify
return;
};
let SmgrOpTimerState::Executing {
execution_started_at,
} = &inner.timings
else {
return None;
return;
};
// update metrics
let execution = at - *execution_started_at;
@@ -1394,36 +1389,9 @@ impl SmgrOpTimer {
// state transition
inner.timings = SmgrOpTimerState::Flushing;
// return the flush in progress object which
// will do the remaining metrics updates
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
..
} = inner;
Some(SmgrOpFlushInProgress {
flush_started_at: at,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
})
}
}
/// The last stage of request processing is serializing and flushing the request
/// into the TCP connection. We want to make slow flushes observable
/// _while they are occuring_, so this struct provides a wrapper method [`Self::measure`]
/// to periodically bump the metric.
///
/// If in the future we decide that we're not interested in live updates, we can
/// add another `observe_*` method to [`SmgrOpTimer`], follow the existing pattern there,
/// and remove this struct from the code base.
pub(crate) struct SmgrOpFlushInProgress {
flush_started_at: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
// In case of early drop, update any of the remaining metrics with
@@ -1442,42 +1410,6 @@ impl Drop for SmgrOpTimer {
}
}
impl SmgrOpFlushInProgress {
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let now = Instant::now();
let elapsed = now - self.flush_started_at;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.flush_started_at = now;
},
|mut observe| {
observe();
},
);
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}
#[derive(
Debug,
Clone,
@@ -1513,6 +1445,56 @@ pub(crate) struct SmgrQueryTimePerTimeline {
throttling: Arc<tenant_throttling::Pagestream>,
}
impl SmgrQueryTimePerTimeline {
pub(crate) async fn record_flush_in_progress<Fut, O>(
shard: &crate::tenant::timeline::handle::WeakHandle<
crate::page_service::TenantManagerTypes,
>,
start_at: Instant,
mut fut: Fut,
) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut base = start_at;
let mut observe_guard = scopeguard::guard(
|| {
let Ok(upgraded) = shard.upgrade() else {
return;
};
let now = Instant::now();
let elapsed = now - base;
upgraded
.query_metrics
.global_flush_in_progress_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
upgraded
.query_metrics
.per_timeline_flush_in_progress_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
base = now;
},
|mut observe| {
observe();
},
);
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
@@ -1797,10 +1779,6 @@ impl SmgrQueryTimePerTimeline {
SmgrOpTimer(Some(SmgrOpTimerInner {
global_execution_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_execution_latency_histo: per_timeline_latency_histo,
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
per_timeline_flush_in_progress_micros: self
.per_timeline_flush_in_progress_micros
.clone(),
global_batch_wait_time: self.global_batch_wait_time.clone(),
per_timeline_batch_wait_time: self.per_timeline_batch_wait_time.clone(),
throttling: self.throttling.clone(),

View File

@@ -1063,9 +1063,10 @@ impl PageServerHandler {
};
// invoke handler function
let (handler_results, span): (
let (handler_results, span, shard): (
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
_,
_,
) = match batch {
BatchedFeMessage::Exists {
span,
@@ -1082,6 +1083,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
BatchedFeMessage::Nblocks {
@@ -1099,6 +1101,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
BatchedFeMessage::GetPage {
@@ -1126,6 +1129,7 @@ impl PageServerHandler {
res
},
span,
Some(shard),
)
}
BatchedFeMessage::DbSize {
@@ -1143,6 +1147,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
BatchedFeMessage::GetSlruSegment {
@@ -1160,6 +1165,7 @@ impl PageServerHandler {
.map(|msg| (msg, timer))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
span,
Some(shard),
)
}
#[cfg(feature = "testing")]
@@ -1181,12 +1187,13 @@ impl PageServerHandler {
res
},
span,
Some(shard),
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(vec![Err(error)], span)
(vec![Err(error)], span, None)
}
};
@@ -1194,7 +1201,7 @@ impl PageServerHandler {
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
for handler_result in handler_results {
let (response_msg, timer) = match handler_result {
let (response_msg, mut timer) = match handler_result {
Err(e) => match &e.err {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
@@ -1250,20 +1257,24 @@ impl PageServerHandler {
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer = timer.map(|mut timer| {
timer
.observe_execution_end_flush_start(Instant::now())
.expect("we are the first caller")
});
let start_flushing_at = Instant::now();
if let Some(timer) = &mut timer {
timer.observe_execution_end_flush_start(start_flushing_at);
}
// what we want to do
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(flush_fut))
}
None => futures::future::Either::Right(flush_fut),
let flush_fut = if let Some(shard) = &shard {
// don't hold upgraded handle while flushing!
futures::future::Either::Left(
metrics::SmgrQueryTimePerTimeline::record_flush_in_progress(
shard,
start_flushing_at,
flush_fut,
),
)
} else {
futures::future::Either::Right(flush_fut)
};
// do it while respecting cancellation
let _: () = async move {
@@ -1280,8 +1291,6 @@ impl PageServerHandler {
}
Ok(())
}
// and log the info! line inside the request span
.instrument(span.clone())
.await?;
}
Ok(())

View File

@@ -437,8 +437,7 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
.map_or(0, |r| r.concurrency_limit());
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
self.update_remote_physical_size_gauge(Some(index_part));
@@ -461,8 +460,7 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
.map_or(0, |r| r.concurrency_limit());
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
self.update_remote_physical_size_gauge(None);
@@ -484,8 +482,7 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
.map_or(0, |r| r.concurrency_limit());
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;

View File

@@ -211,7 +211,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
error_run_count = 0;
// schedule the next compaction immediately in case there is a pending compaction task
sleep_duration = if let CompactionOutcome::Pending = outcome {
Duration::ZERO
Duration::from_secs(1)
} else {
period
};

View File

@@ -192,7 +192,12 @@ pub enum ImageLayerCreationMode {
#[derive(Clone, Debug, Default)]
pub enum LastImageLayerCreationStatus {
Incomplete, // TODO: record the last key being processed
Incomplete {
/// The last key of the partition (exclusive) that was processed in the last
/// image layer creation attempt. We will continue from this key in the next
/// attempt.
last_key: Key,
},
Complete,
#[default]
Initial,
@@ -321,7 +326,7 @@ pub struct Timeline {
// `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
// in `crate::page_service` writes these metrics.
pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
pub(crate) query_metrics: Arc<crate::metrics::SmgrQueryTimePerTimeline>,
directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
@@ -2512,11 +2517,11 @@ impl Timeline {
metrics,
query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
query_metrics: Arc::new(crate::metrics::SmgrQueryTimePerTimeline::new(
&tenant_shard_id,
&timeline_id,
resources.pagestream_throttle_metrics,
),
)),
directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
@@ -4346,7 +4351,7 @@ impl Timeline {
Ok(result)
}
// Is it time to create a new image layer for the given partition?
// Is it time to create a new image layer for the given partition? True if we want to generate.
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
let threshold = self.get_image_creation_threshold();
@@ -4658,6 +4663,11 @@ impl Timeline {
) -> Result<(Vec<ResidentLayer>, LastImageLayerCreationStatus), CreateImageLayersError> {
let timer = self.metrics.create_images_time_histo.start_timer();
if partitioning.parts.is_empty() {
warn!("no partitions to create image layers for");
return Ok((vec![], LastImageLayerCreationStatus::Complete));
}
// We need to avoid holes between generated image layers.
// Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
// image layer with hole between them. In this case such layer can not be utilized by GC.
@@ -4669,28 +4679,65 @@ impl Timeline {
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
let mut start = Key::MIN;
let check_for_image_layers = if let LastImageLayerCreationStatus::Incomplete = last_status {
info!(
"resuming image layer creation: last_status={:?}",
last_status
);
true
} else {
self.should_check_if_image_layers_required(lsn)
};
let check_for_image_layers =
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
info!(
"resuming image layer creation: last_status=incomplete, continue from {}",
last_key
);
true
} else {
self.should_check_if_image_layers_required(lsn)
};
let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;
let mut all_generated = true;
let mut partition_processed = 0;
let total_partitions = partitioning.parts.len();
let mut total_partitions = partitioning.parts.len();
let mut last_partition_processed = None;
let mut partition_parts = partitioning.parts.clone();
for partition in partitioning.parts.iter() {
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
// We need to skip the partitions that have already been processed.
let mut found = false;
for (i, partition) in partition_parts.iter().enumerate() {
if last_key <= partition.end().unwrap() {
// ```plain
// |------|--------|----------|------|
// ^last_key
// ^start from this partition
// ```
// Why `i+1` instead of `i`?
// It is possible that the user did some writes after the previous image layer creation attempt so that
// a relation grows in size, and the last_key is now in the middle of the partition. In this case, we
// still want to skip this partition, so that we can make progress and avoid generating image layers over
// the same partition. Doing a mod to ensure we don't end up with an empty vec.
if i + 1 >= total_partitions {
// In general, this case should not happen -- if last_key is on the last partition, the previous
// iteration of image layer creation should return a complete status.
break; // with found=false
}
partition_parts = partition_parts.split_off(i + 1); // Remove the first i + 1 elements
total_partitions = partition_parts.len();
// Update the start key to the partition start.
start = partition_parts[0].start().unwrap();
found = true;
break;
}
}
if !found {
// Last key is within the last partition, or larger than all partitions.
return Ok((vec![], LastImageLayerCreationStatus::Complete));
}
}
for partition in partition_parts.iter() {
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
}
partition_processed += 1;
let img_range = start..partition.ranges.last().unwrap().end;
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
if compact_metadata {
@@ -4725,6 +4772,8 @@ impl Timeline {
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
is_delta: false,
}) {
// TODO: this can be processed with the BatchLayerWriter::finish_with_discard
// in the future.
tracing::info!(
"Skipping image layer at {lsn} {}..{}, already exists",
img_range.start,
@@ -4805,8 +4854,6 @@ impl Timeline {
}
}
partition_processed += 1;
if let ImageLayerCreationMode::Try = mode {
// We have at least made some progress
if batch_image_writer.pending_layer_num() >= 1 {
@@ -4822,8 +4869,10 @@ impl Timeline {
* self.get_compaction_threshold();
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
tracing::info!(
"preempt image layer generation at {start} at {lsn}: too many L0 layers {num_of_l0_layers}",
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
);
last_partition_processed = Some(partition.clone());
all_generated = false;
break;
}
@@ -4868,7 +4917,14 @@ impl Timeline {
if all_generated {
LastImageLayerCreationStatus::Complete
} else {
LastImageLayerCreationStatus::Incomplete
LastImageLayerCreationStatus::Incomplete {
last_key: if let Some(last_partition_processed) = last_partition_processed {
last_partition_processed.end().unwrap_or(Key::MIN)
} else {
// This branch should be unreachable, but in case it happens, we can just return the start key.
Key::MIN
},
}
},
))
}

View File

@@ -33,6 +33,7 @@ use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::gc_block::GcBlock;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
@@ -438,6 +439,11 @@ impl KeyHistoryRetention {
if dry_run {
return true;
}
if LayerMap::is_l0(&key.key_range, key.is_delta) {
// gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
// We should ignore such layers.
return true;
}
let layer_generation;
{
let guard = tline.layers.read().await;
@@ -748,7 +754,7 @@ impl Timeline {
.store(Arc::new(outcome.clone()));
self.upload_new_image_layers(image_layers)?;
if let LastImageLayerCreationStatus::Incomplete = outcome {
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
// Yield and do not do any other kind of compaction.
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
return Ok(CompactionOutcome::Pending);

View File

@@ -220,8 +220,10 @@ lfc_maybe_disabled(void)
static bool
lfc_ensure_opened(void)
{
bool enabled = !lfc_maybe_disabled();
/* Open cache file if not done yet */
if (lfc_desc <= 0)
if (lfc_desc <= 0 && enabled)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
@@ -231,7 +233,7 @@ lfc_ensure_opened(void)
return false;
}
}
return true;
return enabled;
}
static void
@@ -336,11 +338,10 @@ lfc_change_limit_hook(int newval, void *extra)
{
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
if (!lfc_ctl || !is_normal_backend())
if (!is_normal_backend())
return;
/* Open LFC file only if LFC was enabled or we are going to reenable it */
if ((newval > 0 || LFC_ENABLED()) && !lfc_ensure_opened())
if (!lfc_ensure_opened())
return;
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);

View File

@@ -36,6 +36,11 @@
#include "pagestore_client.h"
#include "walproposer.h"
#ifdef __linux__
#include <sys/ioctl.h>
#include <linux/sockios.h>
#endif
#define PageStoreTrace DEBUG5
#define MIN_RECONNECT_INTERVAL_USEC 1000
@@ -728,11 +733,36 @@ retry:
INSTR_TIME_SUBTRACT(since_last_log, last_log_ts);
if (INSTR_TIME_GET_MILLISEC(since_last_log) >= LOG_INTERVAL_MS)
{
int sndbuf = -1;
int recvbuf = -1;
#ifdef __linux__
int socketfd;
#endif
since_start = now;
INSTR_TIME_SUBTRACT(since_start, start_ts);
neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses)",
#ifdef __linux__
/*
* get kernel's send and recv queue size via ioctl
* https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27
*/
socketfd = PQsocket(pageserver_conn);
if (socketfd != -1) {
int ioctl_err;
ioctl_err = ioctl(socketfd, SIOCOUTQ, &sndbuf);
if (ioctl_err!= 0) {
sndbuf = -errno;
}
ioctl_err = ioctl(socketfd, FIONREAD, &recvbuf);
if (ioctl_err != 0) {
recvbuf = -errno;
}
}
#endif
neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)",
INSTR_TIME_GET_DOUBLE(since_start),
shard->nrequests_sent, shard->nresponses_received);
shard->nrequests_sent, shard->nresponses_received, sndbuf, recvbuf);
last_log_ts = now;
logged = true;
}

View File

@@ -363,7 +363,6 @@ compact_prefetch_buffers(void)
target_slot->buftag = source_slot->buftag;
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->flags = source_slot->flags;
target_slot->response = source_slot->response;
target_slot->reqid = source_slot->reqid;
target_slot->request_lsns = source_slot->request_lsns;
@@ -1118,7 +1117,6 @@ Retry:
slot->buftag = hashkey.buftag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
slot->flags = 0;
min_ring_index = Min(min_ring_index, ring_index);
@@ -2847,13 +2845,12 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
}
tag.blockNum = blocknum;
for (int i = 0; i < PG_IOV_MAX / 8; i++)
lfc_present[i] = ~(lfc_present[i]);
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
lfc_present, true);
nblocks -= iterblocks;
blocknum += iterblocks;

View File

@@ -707,6 +707,7 @@ impl TenantShard {
if let Some(node_id) = self.intent.get_attached() {
// Populate secondary by demoting the attached node
self.intent.demote_attached(scheduler, *node_id);
modified = true;
} else if self.intent.secondary.is_empty() {
// Populate secondary by scheduling a fresh node
@@ -979,24 +980,51 @@ impl TenantShard {
),
)
})
.collect::<Vec<_>>();
.collect::<HashMap<_, _>>();
if secondary_scores.iter().any(|score| score.1.is_none()) {
// Don't have full list of scores, so can't make a good decision about which to drop unless
// there is an obvious one in the wrong AZ
for secondary in self.intent.get_secondary() {
if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
// Trivial case: if we only have one secondary, drop that one
if self.intent.get_secondary().len() == 1 {
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(
*self.intent.get_secondary().first().unwrap(),
),
});
}
// Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
// where its score can't be calculated), and drop the others. This enables us to make progress in
// most cases, even if some nodes are offline or have scheduling=pause set.
debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
// logic presumes we are in a mode where we want secondaries to be in non-home AZ
if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
let is_available = secondary_scores
.get(n)
.expect("Built from same list of nodes")
.is_some();
is_available && !in_home_az
}) {
// Great, we found one to retain. Pick some other to drop.
if let Some(victim) = self
.intent
.get_secondary()
.iter()
.find(|n| n != &retain_secondary)
{
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
});
}
}
// Fall through: we didn't identify one to remove. This ought to be rare.
tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
self.intent.get_secondary()
);
self.intent.get_secondary()
);
} else {
let victim = secondary_scores
.iter()
@@ -1005,7 +1033,7 @@ impl TenantShard {
.0;
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(victim),
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
});
}
}
@@ -2379,6 +2407,110 @@ pub(crate) mod tests {
Ok(())
}
/// Test how the optimisation code behaves with an extra secondary
#[test]
fn optimize_removes_secondary() -> anyhow::Result<()> {
let az_a_tag = AvailabilityZone("az-a".to_string());
let az_b_tag = AvailabilityZone("az-b".to_string());
let mut nodes = make_test_nodes(
4,
&[
az_a_tag.clone(),
az_b_tag.clone(),
az_a_tag.clone(),
az_b_tag.clone(),
],
);
let mut scheduler = Scheduler::new(nodes.values());
let mut schedule_context = ScheduleContext::default();
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
shard_a
.schedule(&mut scheduler, &mut schedule_context)
.unwrap();
// Attached on node 1, secondary on node 2
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
// Initially optimiser is idle
assert_eq!(
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
None
);
assert_eq!(
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
None
);
// A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
// to our new location
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
})
);
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
// A spare secondary in the non-home AZ, and one of them is offline
shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
nodes
.get_mut(&NodeId(4))
.unwrap()
.set_availability(NodeAvailability::Offline);
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
})
);
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
// A spare secondary when should have none
shard_a.policy = PlacementPolicy::Attached(0);
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
})
);
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
assert_eq!(shard_a.intent.get_secondary(), &vec![]);
// Check that in secondary mode, we preserve the secondary in the preferred AZ
let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
shard_a.policy = PlacementPolicy::Secondary;
shard_a
.schedule(&mut scheduler, &mut schedule_context)
.unwrap();
assert_eq!(shard_a.intent.get_attached(), &None);
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
assert_eq!(
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
None
);
assert_eq!(
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
None
);
shard_a.intent.clear(&mut scheduler);
Ok(())
}
// Optimize til quiescent: this emulates what Service::optimize_all does, when
// called repeatedly in the background.
// Returns the applied optimizations

View File

@@ -34,16 +34,20 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
cur.execute("set log_statement = 'all'")
cur.execute("create table t(x integer)")
for _ in range(n_iters):
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
with zenbenchmark.record_duration(f"insert into t values (generate_series(1,{n_records}))"):
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
time.sleep(1)
cur.execute("vacuum t")
with zenbenchmark.record_duration("vacuum t"):
cur.execute("vacuum t")
with zenbenchmark.record_duration("test_query"):
with zenbenchmark.record_duration("SELECT count(*) from t"):
cur.execute("SELECT count(*) from t")
assert cur.fetchone() == (n_iters * n_records,)
flush_ep_to_pageserver(env, endpoint, tenant, timeline)
env.pageserver.http_client().timeline_checkpoint(
tenant, timeline, compact=False, wait_until_uploaded=True
)
with zenbenchmark.record_duration("flush_ep_to_pageserver"):
flush_ep_to_pageserver(env, endpoint, tenant, timeline)
with zenbenchmark.record_duration("timeline_checkpoint"):
env.pageserver.http_client().timeline_checkpoint(
tenant, timeline, compact=False, wait_until_uploaded=True
)

View File

@@ -29,6 +29,21 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
# "lsn_lease_length": "0s", -- TODO: would cause branch creation errors, should fix later
}
PREEMPT_COMPACTION_TENANT_CONF = {
"gc_period": "5s",
"compaction_period": "5s",
# Small checkpoint distance to create many layers
"checkpoint_distance": 1024**2,
# Compact small layers
"compaction_target_size": 1024**2,
"image_creation_threshold": 1,
"image_creation_preempt_threshold": 1,
# compact more frequently
"compaction_threshold": 3,
"compaction_upper_limit": 6,
"lsn_lease_length": "0s",
}
@skip_in_debug_build("only run with release build")
@pytest.mark.parametrize(
@@ -36,7 +51,8 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
)
def test_pageserver_compaction_smoke(
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
neon_env_builder: NeonEnvBuilder,
wal_receiver_protocol: PageserverWalReceiverProtocol,
):
"""
This is a smoke test that compaction kicks in. The workload repeatedly churns
@@ -54,7 +70,8 @@ def test_pageserver_compaction_smoke(
page_cache_size=10
"""
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESSIVE_COMPACTION_TENANT_CONF)
conf = AGGRESSIVE_COMPACTION_TENANT_CONF.copy()
env = neon_env_builder.init_start(initial_tenant_conf=conf)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -113,6 +130,41 @@ page_cache_size=10
assert vectored_average < 8
@skip_in_debug_build("only run with release build")
def test_pageserver_compaction_preempt(
neon_env_builder: NeonEnvBuilder,
):
# Ideally we should be able to do unit tests for this, but we need real Postgres
# WALs in order to do unit testing...
conf = PREEMPT_COMPACTION_TENANT_CONF.copy()
env = neon_env_builder.init_start(initial_tenant_conf=conf)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
row_count = 200000
churn_rounds = 10
ps_http = env.pageserver.http_client()
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageserver.id)
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)
for i in range(1, churn_rounds + 1):
log.info(f"Running churn round {i}/{churn_rounds} ...")
workload.churn_rows(row_count, env.pageserver.id, upload=False)
workload.validate(env.pageserver.id)
ps_http.timeline_compact(tenant_id, timeline_id, wait_until_uploaded=True)
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
# ensure image layer creation gets preempted and then resumed
env.pageserver.assert_log_contains("resuming image layer creation")
@skip_in_debug_build("only run with release build")
@pytest.mark.parametrize(
"with_branches",

View File

@@ -1,11 +1,11 @@
{
"v17": [
"17.2",
"f0ffc8279dbcbbc439981a4fd001a9687e5d665d"
"8dfd5a7030d3e8a98b60265ebe045788892ac7f3"
],
"v16": [
"16.6",
"3cf7ce1afab75027716d14223f95ddb300754162"
"86d9ea96ebb9088eac62f57f1f5ace68e70e0d1c"
],
"v15": [
"15.10",