mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 15:10:44 +00:00
Compare commits
17 Commits
lfc_bug_fi
...
problame/l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8522f429eb | ||
|
|
6e1cbce715 | ||
|
|
949fbc15a0 | ||
|
|
98dd19ef53 | ||
|
|
a0e8b1617b | ||
|
|
1686d9e733 | ||
|
|
abcd00181c | ||
|
|
01f0be03b5 | ||
|
|
81cd30e4d6 | ||
|
|
7fc6953da4 | ||
|
|
77f9e74d86 | ||
|
|
0ceeec9be3 | ||
|
|
733a57247b | ||
|
|
6699a30a49 | ||
|
|
133b89a83d | ||
|
|
fba22a7123 | ||
|
|
14e05276a3 |
@@ -85,6 +85,10 @@ ARG DEBIAN_VERSION=bookworm
|
||||
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
|
||||
ARG ALPINE_CURL_VERSION=8.11.1
|
||||
|
||||
# By default, build all PostgreSQL extensions. For quick local testing when you don't
|
||||
# care about the extensions, pass EXTENSIONS=none or EXTENSIONS=minimal
|
||||
ARG EXTENSIONS=all
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "build-deps"
|
||||
@@ -1484,12 +1488,35 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "all-extensions"
|
||||
# Layer "extensions-none"
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS extensions-none
|
||||
|
||||
RUN mkdir /usr/local/pgsql
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "extensions-minimal"
|
||||
#
|
||||
# This subset of extensions includes the extensions that we have in
|
||||
# shared_preload_libraries by default.
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS extensions-minimal
|
||||
|
||||
COPY --from=pgrag-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=timescaledb-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_cron-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "extensions-all"
|
||||
# Bundle together all the extensions
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS all-extensions
|
||||
ARG PG_VERSION
|
||||
FROM build-deps AS extensions-all
|
||||
|
||||
# Public extensions
|
||||
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
@@ -1531,7 +1558,13 @@ COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
COPY --from=neon-ext-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "neon-pg-ext-build"
|
||||
# Includes Postgres and all the extensions chosen by EXTENSIONS arg.
|
||||
#
|
||||
#########################################################################################
|
||||
FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -1614,7 +1647,8 @@ RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 30
|
||||
#
|
||||
#########################################################################################
|
||||
FROM neon-ext-build AS postgres-cleanup-layer
|
||||
COPY --from=all-extensions /usr/local/pgsql /usr/local/pgsql
|
||||
|
||||
COPY --from=neon-pg-ext-build /usr/local/pgsql /usr/local/pgsql
|
||||
|
||||
# Remove binaries from /bin/ that we won't use (or would manually copy & install otherwise)
|
||||
RUN cd /usr/local/pgsql/bin && rm -f ecpg raster2pgsql shp2pgsql pgtopo_export pgtopo_import pgsql2shp
|
||||
|
||||
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
|
||||
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
};
|
||||
|
||||
/// External backup storage configuration, enough for creating a client for that storage.
|
||||
@@ -45,11 +45,11 @@ impl RemoteStorageKind {
|
||||
|
||||
impl RemoteStorageConfig {
|
||||
/// Helper to fetch the configured concurrency limit.
|
||||
pub fn concurrency_limit(&self) -> Option<usize> {
|
||||
pub fn concurrency_limit(&self) -> usize {
|
||||
match &self.storage {
|
||||
RemoteStorageKind::LocalFs { .. } => None,
|
||||
RemoteStorageKind::AwsS3(c) => Some(c.concurrency_limit.into()),
|
||||
RemoteStorageKind::AzureContainer(c) => Some(c.concurrency_limit.into()),
|
||||
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
|
||||
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
|
||||
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,6 +65,12 @@ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
|
||||
/// Here, a limit of max 20k concurrent connections was noted.
|
||||
/// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
|
||||
pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
|
||||
/// Set this limit analogously to the S3 limit.
|
||||
///
|
||||
/// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
|
||||
/// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
|
||||
/// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
|
||||
pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
|
||||
/// No limits on the client side, which currenltly means 1000 for AWS S3.
|
||||
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
|
||||
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
|
||||
|
||||
@@ -1253,9 +1253,6 @@ pub(crate) struct SmgrOpTimerInner {
|
||||
global_batch_wait_time: Histogram,
|
||||
per_timeline_batch_wait_time: Histogram,
|
||||
|
||||
global_flush_in_progress_micros: IntCounter,
|
||||
per_timeline_flush_in_progress_micros: IntCounter,
|
||||
|
||||
throttling: Arc<tenant_throttling::Pagestream>,
|
||||
|
||||
timings: SmgrOpTimerState,
|
||||
@@ -1366,20 +1363,18 @@ impl SmgrOpTimer {
|
||||
/// The first callers receives Some, subsequent ones None.
|
||||
///
|
||||
/// See [`SmgrOpTimerState`] for more context.
|
||||
pub(crate) fn observe_execution_end_flush_start(
|
||||
&mut self,
|
||||
at: Instant,
|
||||
) -> Option<SmgrOpFlushInProgress> {
|
||||
pub(crate) fn observe_execution_end_flush_start(&mut self, at: Instant) {
|
||||
// NB: unlike the other observe_* methods, this one take()s.
|
||||
#[allow(clippy::question_mark)] // maintain similar code pattern.
|
||||
let Some(mut inner) = self.0.take() else {
|
||||
return None;
|
||||
// NB: this take() isn't needed anymore, maybe we can simplify
|
||||
return;
|
||||
};
|
||||
let SmgrOpTimerState::Executing {
|
||||
execution_started_at,
|
||||
} = &inner.timings
|
||||
else {
|
||||
return None;
|
||||
return;
|
||||
};
|
||||
// update metrics
|
||||
let execution = at - *execution_started_at;
|
||||
@@ -1394,36 +1389,9 @@ impl SmgrOpTimer {
|
||||
|
||||
// state transition
|
||||
inner.timings = SmgrOpTimerState::Flushing;
|
||||
|
||||
// return the flush in progress object which
|
||||
// will do the remaining metrics updates
|
||||
let SmgrOpTimerInner {
|
||||
global_flush_in_progress_micros,
|
||||
per_timeline_flush_in_progress_micros,
|
||||
..
|
||||
} = inner;
|
||||
Some(SmgrOpFlushInProgress {
|
||||
flush_started_at: at,
|
||||
global_micros: global_flush_in_progress_micros,
|
||||
per_timeline_micros: per_timeline_flush_in_progress_micros,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// The last stage of request processing is serializing and flushing the request
|
||||
/// into the TCP connection. We want to make slow flushes observable
|
||||
/// _while they are occuring_, so this struct provides a wrapper method [`Self::measure`]
|
||||
/// to periodically bump the metric.
|
||||
///
|
||||
/// If in the future we decide that we're not interested in live updates, we can
|
||||
/// add another `observe_*` method to [`SmgrOpTimer`], follow the existing pattern there,
|
||||
/// and remove this struct from the code base.
|
||||
pub(crate) struct SmgrOpFlushInProgress {
|
||||
flush_started_at: Instant,
|
||||
global_micros: IntCounter,
|
||||
per_timeline_micros: IntCounter,
|
||||
}
|
||||
|
||||
impl Drop for SmgrOpTimer {
|
||||
fn drop(&mut self) {
|
||||
// In case of early drop, update any of the remaining metrics with
|
||||
@@ -1442,42 +1410,6 @@ impl Drop for SmgrOpTimer {
|
||||
}
|
||||
}
|
||||
|
||||
impl SmgrOpFlushInProgress {
|
||||
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
|
||||
where
|
||||
Fut: std::future::Future<Output = O>,
|
||||
{
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
|
||||
// Whenever observe_guard gets called, or dropped,
|
||||
// it adds the time elapsed since its last call to metrics.
|
||||
// Last call is tracked in `now`.
|
||||
let mut observe_guard = scopeguard::guard(
|
||||
|| {
|
||||
let now = Instant::now();
|
||||
let elapsed = now - self.flush_started_at;
|
||||
self.global_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
self.per_timeline_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
self.flush_started_at = now;
|
||||
},
|
||||
|mut observe| {
|
||||
observe();
|
||||
},
|
||||
);
|
||||
|
||||
loop {
|
||||
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
|
||||
Ok(v) => return v,
|
||||
Err(_timeout) => {
|
||||
(*observe_guard)();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
@@ -1513,6 +1445,56 @@ pub(crate) struct SmgrQueryTimePerTimeline {
|
||||
throttling: Arc<tenant_throttling::Pagestream>,
|
||||
}
|
||||
|
||||
impl SmgrQueryTimePerTimeline {
|
||||
pub(crate) async fn record_flush_in_progress<Fut, O>(
|
||||
shard: &crate::tenant::timeline::handle::WeakHandle<
|
||||
crate::page_service::TenantManagerTypes,
|
||||
>,
|
||||
start_at: Instant,
|
||||
mut fut: Fut,
|
||||
) -> O
|
||||
where
|
||||
Fut: std::future::Future<Output = O>,
|
||||
{
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
|
||||
// Whenever observe_guard gets called, or dropped,
|
||||
// it adds the time elapsed since its last call to metrics.
|
||||
// Last call is tracked in `now`.
|
||||
let mut base = start_at;
|
||||
let mut observe_guard = scopeguard::guard(
|
||||
|| {
|
||||
let Ok(upgraded) = shard.upgrade() else {
|
||||
return;
|
||||
};
|
||||
let now = Instant::now();
|
||||
let elapsed = now - base;
|
||||
upgraded
|
||||
.query_metrics
|
||||
.global_flush_in_progress_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
upgraded
|
||||
.query_metrics
|
||||
.per_timeline_flush_in_progress_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
base = now;
|
||||
},
|
||||
|mut observe| {
|
||||
observe();
|
||||
},
|
||||
);
|
||||
|
||||
loop {
|
||||
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
|
||||
Ok(v) => return v,
|
||||
Err(_timeout) => {
|
||||
(*observe_guard)();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
|
||||
@@ -1797,10 +1779,6 @@ impl SmgrQueryTimePerTimeline {
|
||||
SmgrOpTimer(Some(SmgrOpTimerInner {
|
||||
global_execution_latency_histo: self.global_latency[op as usize].clone(),
|
||||
per_timeline_execution_latency_histo: per_timeline_latency_histo,
|
||||
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
|
||||
per_timeline_flush_in_progress_micros: self
|
||||
.per_timeline_flush_in_progress_micros
|
||||
.clone(),
|
||||
global_batch_wait_time: self.global_batch_wait_time.clone(),
|
||||
per_timeline_batch_wait_time: self.per_timeline_batch_wait_time.clone(),
|
||||
throttling: self.throttling.clone(),
|
||||
|
||||
@@ -1063,9 +1063,10 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
// invoke handler function
|
||||
let (handler_results, span): (
|
||||
let (handler_results, span, shard): (
|
||||
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
|
||||
_,
|
||||
_,
|
||||
) = match batch {
|
||||
BatchedFeMessage::Exists {
|
||||
span,
|
||||
@@ -1082,6 +1083,7 @@ impl PageServerHandler {
|
||||
.map(|msg| (msg, timer))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
|
||||
span,
|
||||
Some(shard),
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::Nblocks {
|
||||
@@ -1099,6 +1101,7 @@ impl PageServerHandler {
|
||||
.map(|msg| (msg, timer))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
|
||||
span,
|
||||
Some(shard),
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::GetPage {
|
||||
@@ -1126,6 +1129,7 @@ impl PageServerHandler {
|
||||
res
|
||||
},
|
||||
span,
|
||||
Some(shard),
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::DbSize {
|
||||
@@ -1143,6 +1147,7 @@ impl PageServerHandler {
|
||||
.map(|msg| (msg, timer))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
|
||||
span,
|
||||
Some(shard),
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::GetSlruSegment {
|
||||
@@ -1160,6 +1165,7 @@ impl PageServerHandler {
|
||||
.map(|msg| (msg, timer))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
|
||||
span,
|
||||
Some(shard),
|
||||
)
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -1181,12 +1187,13 @@ impl PageServerHandler {
|
||||
res
|
||||
},
|
||||
span,
|
||||
Some(shard),
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::RespondError { span, error } => {
|
||||
// We've already decided to respond with an error, so we don't need to
|
||||
// call the handler.
|
||||
(vec![Err(error)], span)
|
||||
(vec![Err(error)], span, None)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1194,7 +1201,7 @@ impl PageServerHandler {
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
for handler_result in handler_results {
|
||||
let (response_msg, timer) = match handler_result {
|
||||
let (response_msg, mut timer) = match handler_result {
|
||||
Err(e) => match &e.err {
|
||||
PageStreamError::Shutdown => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
@@ -1250,20 +1257,24 @@ impl PageServerHandler {
|
||||
// The timer's underlying metric is used for a storage-internal latency SLO and
|
||||
// we don't want to include latency in it that we can't control.
|
||||
// And as pointed out above, in this case, we don't control the time that flush will take.
|
||||
let flushing_timer = timer.map(|mut timer| {
|
||||
timer
|
||||
.observe_execution_end_flush_start(Instant::now())
|
||||
.expect("we are the first caller")
|
||||
});
|
||||
let start_flushing_at = Instant::now();
|
||||
if let Some(timer) = &mut timer {
|
||||
timer.observe_execution_end_flush_start(start_flushing_at);
|
||||
}
|
||||
|
||||
// what we want to do
|
||||
let flush_fut = pgb_writer.flush();
|
||||
// metric for how long flushing takes
|
||||
let flush_fut = match flushing_timer {
|
||||
Some(flushing_timer) => {
|
||||
futures::future::Either::Left(flushing_timer.measure(flush_fut))
|
||||
}
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
let flush_fut = if let Some(shard) = &shard {
|
||||
// don't hold upgraded handle while flushing!
|
||||
futures::future::Either::Left(
|
||||
metrics::SmgrQueryTimePerTimeline::record_flush_in_progress(
|
||||
shard,
|
||||
start_flushing_at,
|
||||
flush_fut,
|
||||
),
|
||||
)
|
||||
} else {
|
||||
futures::future::Either::Right(flush_fut)
|
||||
};
|
||||
// do it while respecting cancellation
|
||||
let _: () = async move {
|
||||
@@ -1280,8 +1291,6 @@ impl PageServerHandler {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
// and log the info! line inside the request span
|
||||
.instrument(span.clone())
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -437,8 +437,7 @@ impl RemoteTimelineClient {
|
||||
.conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.and_then(|r| r.concurrency_limit())
|
||||
.unwrap_or(0);
|
||||
.map_or(0, |r| r.concurrency_limit());
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
|
||||
self.update_remote_physical_size_gauge(Some(index_part));
|
||||
@@ -461,8 +460,7 @@ impl RemoteTimelineClient {
|
||||
.conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.and_then(|r| r.concurrency_limit())
|
||||
.unwrap_or(0);
|
||||
.map_or(0, |r| r.concurrency_limit());
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
|
||||
self.update_remote_physical_size_gauge(None);
|
||||
@@ -484,8 +482,7 @@ impl RemoteTimelineClient {
|
||||
.conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.and_then(|r| r.concurrency_limit())
|
||||
.unwrap_or(0);
|
||||
.map_or(0, |r| r.concurrency_limit());
|
||||
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
|
||||
|
||||
@@ -211,7 +211,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
error_run_count = 0;
|
||||
// schedule the next compaction immediately in case there is a pending compaction task
|
||||
sleep_duration = if let CompactionOutcome::Pending = outcome {
|
||||
Duration::ZERO
|
||||
Duration::from_secs(1)
|
||||
} else {
|
||||
period
|
||||
};
|
||||
|
||||
@@ -192,7 +192,12 @@ pub enum ImageLayerCreationMode {
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub enum LastImageLayerCreationStatus {
|
||||
Incomplete, // TODO: record the last key being processed
|
||||
Incomplete {
|
||||
/// The last key of the partition (exclusive) that was processed in the last
|
||||
/// image layer creation attempt. We will continue from this key in the next
|
||||
/// attempt.
|
||||
last_key: Key,
|
||||
},
|
||||
Complete,
|
||||
#[default]
|
||||
Initial,
|
||||
@@ -321,7 +326,7 @@ pub struct Timeline {
|
||||
|
||||
// `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
|
||||
// in `crate::page_service` writes these metrics.
|
||||
pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
|
||||
pub(crate) query_metrics: Arc<crate::metrics::SmgrQueryTimePerTimeline>,
|
||||
|
||||
directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
|
||||
|
||||
@@ -2512,11 +2517,11 @@ impl Timeline {
|
||||
|
||||
metrics,
|
||||
|
||||
query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
|
||||
query_metrics: Arc::new(crate::metrics::SmgrQueryTimePerTimeline::new(
|
||||
&tenant_shard_id,
|
||||
&timeline_id,
|
||||
resources.pagestream_throttle_metrics,
|
||||
),
|
||||
)),
|
||||
|
||||
directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
|
||||
|
||||
@@ -4346,7 +4351,7 @@ impl Timeline {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
// Is it time to create a new image layer for the given partition? True if we want to generate.
|
||||
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
@@ -4658,6 +4663,11 @@ impl Timeline {
|
||||
) -> Result<(Vec<ResidentLayer>, LastImageLayerCreationStatus), CreateImageLayersError> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
|
||||
if partitioning.parts.is_empty() {
|
||||
warn!("no partitions to create image layers for");
|
||||
return Ok((vec![], LastImageLayerCreationStatus::Complete));
|
||||
}
|
||||
|
||||
// We need to avoid holes between generated image layers.
|
||||
// Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
|
||||
// image layer with hole between them. In this case such layer can not be utilized by GC.
|
||||
@@ -4669,28 +4679,65 @@ impl Timeline {
|
||||
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
|
||||
let mut start = Key::MIN;
|
||||
|
||||
let check_for_image_layers = if let LastImageLayerCreationStatus::Incomplete = last_status {
|
||||
info!(
|
||||
"resuming image layer creation: last_status={:?}",
|
||||
last_status
|
||||
);
|
||||
true
|
||||
} else {
|
||||
self.should_check_if_image_layers_required(lsn)
|
||||
};
|
||||
let check_for_image_layers =
|
||||
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
|
||||
info!(
|
||||
"resuming image layer creation: last_status=incomplete, continue from {}",
|
||||
last_key
|
||||
);
|
||||
true
|
||||
} else {
|
||||
self.should_check_if_image_layers_required(lsn)
|
||||
};
|
||||
|
||||
let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;
|
||||
|
||||
let mut all_generated = true;
|
||||
|
||||
let mut partition_processed = 0;
|
||||
let total_partitions = partitioning.parts.len();
|
||||
let mut total_partitions = partitioning.parts.len();
|
||||
let mut last_partition_processed = None;
|
||||
let mut partition_parts = partitioning.parts.clone();
|
||||
|
||||
for partition in partitioning.parts.iter() {
|
||||
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
|
||||
// We need to skip the partitions that have already been processed.
|
||||
let mut found = false;
|
||||
for (i, partition) in partition_parts.iter().enumerate() {
|
||||
if last_key <= partition.end().unwrap() {
|
||||
// ```plain
|
||||
// |------|--------|----------|------|
|
||||
// ^last_key
|
||||
// ^start from this partition
|
||||
// ```
|
||||
// Why `i+1` instead of `i`?
|
||||
// It is possible that the user did some writes after the previous image layer creation attempt so that
|
||||
// a relation grows in size, and the last_key is now in the middle of the partition. In this case, we
|
||||
// still want to skip this partition, so that we can make progress and avoid generating image layers over
|
||||
// the same partition. Doing a mod to ensure we don't end up with an empty vec.
|
||||
if i + 1 >= total_partitions {
|
||||
// In general, this case should not happen -- if last_key is on the last partition, the previous
|
||||
// iteration of image layer creation should return a complete status.
|
||||
break; // with found=false
|
||||
}
|
||||
partition_parts = partition_parts.split_off(i + 1); // Remove the first i + 1 elements
|
||||
total_partitions = partition_parts.len();
|
||||
// Update the start key to the partition start.
|
||||
start = partition_parts[0].start().unwrap();
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
// Last key is within the last partition, or larger than all partitions.
|
||||
return Ok((vec![], LastImageLayerCreationStatus::Complete));
|
||||
}
|
||||
}
|
||||
|
||||
for partition in partition_parts.iter() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
|
||||
partition_processed += 1;
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
|
||||
if compact_metadata {
|
||||
@@ -4725,6 +4772,8 @@ impl Timeline {
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
|
||||
is_delta: false,
|
||||
}) {
|
||||
// TODO: this can be processed with the BatchLayerWriter::finish_with_discard
|
||||
// in the future.
|
||||
tracing::info!(
|
||||
"Skipping image layer at {lsn} {}..{}, already exists",
|
||||
img_range.start,
|
||||
@@ -4805,8 +4854,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
partition_processed += 1;
|
||||
|
||||
if let ImageLayerCreationMode::Try = mode {
|
||||
// We have at least made some progress
|
||||
if batch_image_writer.pending_layer_num() >= 1 {
|
||||
@@ -4822,8 +4869,10 @@ impl Timeline {
|
||||
* self.get_compaction_threshold();
|
||||
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {start} at {lsn}: too many L0 layers {num_of_l0_layers}",
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
|
||||
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
}
|
||||
@@ -4868,7 +4917,14 @@ impl Timeline {
|
||||
if all_generated {
|
||||
LastImageLayerCreationStatus::Complete
|
||||
} else {
|
||||
LastImageLayerCreationStatus::Incomplete
|
||||
LastImageLayerCreationStatus::Incomplete {
|
||||
last_key: if let Some(last_partition_processed) = last_partition_processed {
|
||||
last_partition_processed.end().unwrap_or(Key::MIN)
|
||||
} else {
|
||||
// This branch should be unreachable, but in case it happens, we can just return the start key.
|
||||
Key::MIN
|
||||
},
|
||||
}
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::page_cache;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::gc_block::GcBlock;
|
||||
use crate::tenant::layer_map::LayerMap;
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::batch_split_writer::{
|
||||
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
|
||||
@@ -438,6 +439,11 @@ impl KeyHistoryRetention {
|
||||
if dry_run {
|
||||
return true;
|
||||
}
|
||||
if LayerMap::is_l0(&key.key_range, key.is_delta) {
|
||||
// gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
|
||||
// We should ignore such layers.
|
||||
return true;
|
||||
}
|
||||
let layer_generation;
|
||||
{
|
||||
let guard = tline.layers.read().await;
|
||||
@@ -748,7 +754,7 @@ impl Timeline {
|
||||
.store(Arc::new(outcome.clone()));
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete = outcome {
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
|
||||
return Ok(CompactionOutcome::Pending);
|
||||
|
||||
@@ -220,8 +220,10 @@ lfc_maybe_disabled(void)
|
||||
static bool
|
||||
lfc_ensure_opened(void)
|
||||
{
|
||||
bool enabled = !lfc_maybe_disabled();
|
||||
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc <= 0)
|
||||
if (lfc_desc <= 0 && enabled)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
|
||||
|
||||
@@ -231,7 +233,7 @@ lfc_ensure_opened(void)
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
return enabled;
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -336,11 +338,10 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
{
|
||||
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
|
||||
|
||||
if (!lfc_ctl || !is_normal_backend())
|
||||
if (!is_normal_backend())
|
||||
return;
|
||||
|
||||
/* Open LFC file only if LFC was enabled or we are going to reenable it */
|
||||
if ((newval > 0 || LFC_ENABLED()) && !lfc_ensure_opened())
|
||||
if (!lfc_ensure_opened())
|
||||
return;
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
@@ -36,6 +36,11 @@
|
||||
#include "pagestore_client.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
#ifdef __linux__
|
||||
#include <sys/ioctl.h>
|
||||
#include <linux/sockios.h>
|
||||
#endif
|
||||
|
||||
#define PageStoreTrace DEBUG5
|
||||
|
||||
#define MIN_RECONNECT_INTERVAL_USEC 1000
|
||||
@@ -728,11 +733,36 @@ retry:
|
||||
INSTR_TIME_SUBTRACT(since_last_log, last_log_ts);
|
||||
if (INSTR_TIME_GET_MILLISEC(since_last_log) >= LOG_INTERVAL_MS)
|
||||
{
|
||||
int sndbuf = -1;
|
||||
int recvbuf = -1;
|
||||
#ifdef __linux__
|
||||
int socketfd;
|
||||
#endif
|
||||
|
||||
since_start = now;
|
||||
INSTR_TIME_SUBTRACT(since_start, start_ts);
|
||||
neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses)",
|
||||
|
||||
#ifdef __linux__
|
||||
/*
|
||||
* get kernel's send and recv queue size via ioctl
|
||||
* https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27
|
||||
*/
|
||||
socketfd = PQsocket(pageserver_conn);
|
||||
if (socketfd != -1) {
|
||||
int ioctl_err;
|
||||
ioctl_err = ioctl(socketfd, SIOCOUTQ, &sndbuf);
|
||||
if (ioctl_err!= 0) {
|
||||
sndbuf = -errno;
|
||||
}
|
||||
ioctl_err = ioctl(socketfd, FIONREAD, &recvbuf);
|
||||
if (ioctl_err != 0) {
|
||||
recvbuf = -errno;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)",
|
||||
INSTR_TIME_GET_DOUBLE(since_start),
|
||||
shard->nrequests_sent, shard->nresponses_received);
|
||||
shard->nrequests_sent, shard->nresponses_received, sndbuf, recvbuf);
|
||||
last_log_ts = now;
|
||||
logged = true;
|
||||
}
|
||||
|
||||
@@ -363,7 +363,6 @@ compact_prefetch_buffers(void)
|
||||
target_slot->buftag = source_slot->buftag;
|
||||
target_slot->shard_no = source_slot->shard_no;
|
||||
target_slot->status = source_slot->status;
|
||||
target_slot->flags = source_slot->flags;
|
||||
target_slot->response = source_slot->response;
|
||||
target_slot->reqid = source_slot->reqid;
|
||||
target_slot->request_lsns = source_slot->request_lsns;
|
||||
@@ -1118,7 +1117,6 @@ Retry:
|
||||
slot->buftag = hashkey.buftag;
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
slot->flags = 0;
|
||||
|
||||
min_ring_index = Min(min_ring_index, ring_index);
|
||||
|
||||
@@ -2847,13 +2845,12 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
}
|
||||
|
||||
tag.blockNum = blocknum;
|
||||
|
||||
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
lfc_present[i] = ~(lfc_present[i]);
|
||||
|
||||
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
|
||||
lfc_present, true);
|
||||
|
||||
nblocks -= iterblocks;
|
||||
blocknum += iterblocks;
|
||||
|
||||
|
||||
@@ -707,6 +707,7 @@ impl TenantShard {
|
||||
if let Some(node_id) = self.intent.get_attached() {
|
||||
// Populate secondary by demoting the attached node
|
||||
self.intent.demote_attached(scheduler, *node_id);
|
||||
|
||||
modified = true;
|
||||
} else if self.intent.secondary.is_empty() {
|
||||
// Populate secondary by scheduling a fresh node
|
||||
@@ -979,24 +980,51 @@ impl TenantShard {
|
||||
),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
if secondary_scores.iter().any(|score| score.1.is_none()) {
|
||||
// Don't have full list of scores, so can't make a good decision about which to drop unless
|
||||
// there is an obvious one in the wrong AZ
|
||||
for secondary in self.intent.get_secondary() {
|
||||
if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
|
||||
// Trivial case: if we only have one secondary, drop that one
|
||||
if self.intent.get_secondary().len() == 1 {
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(
|
||||
*self.intent.get_secondary().first().unwrap(),
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
// Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
|
||||
// where its score can't be calculated), and drop the others. This enables us to make progress in
|
||||
// most cases, even if some nodes are offline or have scheduling=pause set.
|
||||
|
||||
debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
|
||||
// logic presumes we are in a mode where we want secondaries to be in non-home AZ
|
||||
if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
|
||||
let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
|
||||
let is_available = secondary_scores
|
||||
.get(n)
|
||||
.expect("Built from same list of nodes")
|
||||
.is_some();
|
||||
is_available && !in_home_az
|
||||
}) {
|
||||
// Great, we found one to retain. Pick some other to drop.
|
||||
if let Some(victim) = self
|
||||
.intent
|
||||
.get_secondary()
|
||||
.iter()
|
||||
.find(|n| n != &retain_secondary)
|
||||
{
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Fall through: we didn't identify one to remove. This ought to be rare.
|
||||
tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
} else {
|
||||
let victim = secondary_scores
|
||||
.iter()
|
||||
@@ -1005,7 +1033,7 @@ impl TenantShard {
|
||||
.0;
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(victim),
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -2379,6 +2407,110 @@ pub(crate) mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test how the optimisation code behaves with an extra secondary
|
||||
#[test]
|
||||
fn optimize_removes_secondary() -> anyhow::Result<()> {
|
||||
let az_a_tag = AvailabilityZone("az-a".to_string());
|
||||
let az_b_tag = AvailabilityZone("az-b".to_string());
|
||||
let mut nodes = make_test_nodes(
|
||||
4,
|
||||
&[
|
||||
az_a_tag.clone(),
|
||||
az_b_tag.clone(),
|
||||
az_a_tag.clone(),
|
||||
az_b_tag.clone(),
|
||||
],
|
||||
);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
|
||||
shard_a
|
||||
.schedule(&mut scheduler, &mut schedule_context)
|
||||
.unwrap();
|
||||
|
||||
// Attached on node 1, secondary on node 2
|
||||
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
|
||||
|
||||
// Initially optimiser is idle
|
||||
assert_eq!(
|
||||
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
|
||||
// A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
|
||||
// to our new location
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
|
||||
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
||||
|
||||
// A spare secondary in the non-home AZ, and one of them is offline
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
|
||||
nodes
|
||||
.get_mut(&NodeId(4))
|
||||
.unwrap()
|
||||
.set_availability(NodeAvailability::Offline);
|
||||
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
|
||||
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
||||
|
||||
// A spare secondary when should have none
|
||||
shard_a.policy = PlacementPolicy::Attached(0);
|
||||
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
||||
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![]);
|
||||
|
||||
// Check that in secondary mode, we preserve the secondary in the preferred AZ
|
||||
let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
|
||||
shard_a.policy = PlacementPolicy::Secondary;
|
||||
shard_a
|
||||
.schedule(&mut scheduler, &mut schedule_context)
|
||||
.unwrap();
|
||||
assert_eq!(shard_a.intent.get_attached(), &None);
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
|
||||
assert_eq!(
|
||||
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
|
||||
None
|
||||
);
|
||||
|
||||
shard_a.intent.clear(&mut scheduler);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Optimize til quiescent: this emulates what Service::optimize_all does, when
|
||||
// called repeatedly in the background.
|
||||
// Returns the applied optimizations
|
||||
|
||||
@@ -34,16 +34,20 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
cur.execute("set log_statement = 'all'")
|
||||
cur.execute("create table t(x integer)")
|
||||
for _ in range(n_iters):
|
||||
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
|
||||
with zenbenchmark.record_duration(f"insert into t values (generate_series(1,{n_records}))"):
|
||||
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
|
||||
time.sleep(1)
|
||||
|
||||
cur.execute("vacuum t")
|
||||
with zenbenchmark.record_duration("vacuum t"):
|
||||
cur.execute("vacuum t")
|
||||
|
||||
with zenbenchmark.record_duration("test_query"):
|
||||
with zenbenchmark.record_duration("SELECT count(*) from t"):
|
||||
cur.execute("SELECT count(*) from t")
|
||||
assert cur.fetchone() == (n_iters * n_records,)
|
||||
|
||||
flush_ep_to_pageserver(env, endpoint, tenant, timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
tenant, timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
with zenbenchmark.record_duration("flush_ep_to_pageserver"):
|
||||
flush_ep_to_pageserver(env, endpoint, tenant, timeline)
|
||||
with zenbenchmark.record_duration("timeline_checkpoint"):
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
tenant, timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
@@ -29,6 +29,21 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
|
||||
# "lsn_lease_length": "0s", -- TODO: would cause branch creation errors, should fix later
|
||||
}
|
||||
|
||||
PREEMPT_COMPACTION_TENANT_CONF = {
|
||||
"gc_period": "5s",
|
||||
"compaction_period": "5s",
|
||||
# Small checkpoint distance to create many layers
|
||||
"checkpoint_distance": 1024**2,
|
||||
# Compact small layers
|
||||
"compaction_target_size": 1024**2,
|
||||
"image_creation_threshold": 1,
|
||||
"image_creation_preempt_threshold": 1,
|
||||
# compact more frequently
|
||||
"compaction_threshold": 3,
|
||||
"compaction_upper_limit": 6,
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.parametrize(
|
||||
@@ -36,7 +51,8 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_pageserver_compaction_smoke(
|
||||
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
wal_receiver_protocol: PageserverWalReceiverProtocol,
|
||||
):
|
||||
"""
|
||||
This is a smoke test that compaction kicks in. The workload repeatedly churns
|
||||
@@ -54,7 +70,8 @@ def test_pageserver_compaction_smoke(
|
||||
page_cache_size=10
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESSIVE_COMPACTION_TENANT_CONF)
|
||||
conf = AGGRESSIVE_COMPACTION_TENANT_CONF.copy()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -113,6 +130,41 @@ page_cache_size=10
|
||||
assert vectored_average < 8
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
def test_pageserver_compaction_preempt(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
# Ideally we should be able to do unit tests for this, but we need real Postgres
|
||||
# WALs in order to do unit testing...
|
||||
|
||||
conf = PREEMPT_COMPACTION_TENANT_CONF.copy()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
row_count = 200000
|
||||
churn_rounds = 10
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init(env.pageserver.id)
|
||||
|
||||
log.info("Writing initial data ...")
|
||||
workload.write_rows(row_count, env.pageserver.id)
|
||||
|
||||
for i in range(1, churn_rounds + 1):
|
||||
log.info(f"Running churn round {i}/{churn_rounds} ...")
|
||||
workload.churn_rows(row_count, env.pageserver.id, upload=False)
|
||||
workload.validate(env.pageserver.id)
|
||||
ps_http.timeline_compact(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
log.info("Validating at workload end ...")
|
||||
workload.validate(env.pageserver.id)
|
||||
# ensure image layer creation gets preempted and then resumed
|
||||
env.pageserver.assert_log_contains("resuming image layer creation")
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.parametrize(
|
||||
"with_branches",
|
||||
|
||||
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 3cf7ce1afa...86d9ea96eb
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: f0ffc8279d...8dfd5a7030
4
vendor/revisions.json
vendored
4
vendor/revisions.json
vendored
@@ -1,11 +1,11 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.2",
|
||||
"f0ffc8279dbcbbc439981a4fd001a9687e5d665d"
|
||||
"8dfd5a7030d3e8a98b60265ebe045788892ac7f3"
|
||||
],
|
||||
"v16": [
|
||||
"16.6",
|
||||
"3cf7ce1afab75027716d14223f95ddb300754162"
|
||||
"86d9ea96ebb9088eac62f57f1f5ace68e70e0d1c"
|
||||
],
|
||||
"v15": [
|
||||
"15.10",
|
||||
|
||||
Reference in New Issue
Block a user