Compare commits

...

18 Commits

Author SHA1 Message Date
Konstantin Knizhnik
8852e8a766 Fix prewarming terminatoin condition 2024-10-27 14:14:06 +02:00
Konstantin Knizhnik
c04ae556c5 Fix comments and other minor refactoring 2024-10-27 09:08:40 +02:00
Konstantin Knizhnik
44b283e107 Increase timeout in test_lfc_prewarm 2024-10-26 08:22:53 +03:00
Konstantin Knizhnik
c7a3359edd Add prewarm_local_cache and get_local_cache_state functions 2024-10-25 23:05:07 +03:00
Konstantin Knizhnik
012a8a360f Fix get_prewarm_info() 2024-10-25 10:11:58 +03:00
Konstantin Knizhnik
0b42695983 Report prewarm progress 2024-10-25 08:26:02 +03:00
Konstantin Knizhnik
284d7b4da6 Report prewarm progress 2024-10-25 08:26:00 +03:00
Konstantin Knizhnik
361fc04cd6 Fix warnings 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
12f635aa04 Fix warnings 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
ac6c53b94b Support prewarming of replica 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
df289738b8 Explain why we do not want to perform prewarm of LFC at replica and how it is avoided now 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
0d3503a187 Check for number of used pages rather than chunks in test_lfc_prewarm.py 2024-10-25 08:24:44 +03:00
Konstantin Knizhnik
f328d497e1 Wait LFC prewarm completion in the loop in test_lfc_prewarm.py 2024-10-25 08:24:44 +03:00
Konstantin Knizhnik
f971c3a786 Implement LFC prewarm 2024-10-25 08:24:44 +03:00
Vlad Lazar
5069123b6d pageserver: refactor ingest inplace to decouple decoding and handling (#9472)
## Problem

WAL ingest couples decoding of special records with their handling
(updates to the storage engine mostly).
This is a roadblock for our plan to move WAL filtering (and implicitly
decoding) to safekeepers since they cannot
do writes to the storage engine. 

## Summary of changes

This PR decouples the decoding of the special WAL records from their
application. The changes are done in place
and I've done my best to refrain from refactorings and attempted to
preserve the original code as much as possible.

Related: https://github.com/neondatabase/neon/issues/9335
Epic: https://github.com/neondatabase/neon/issues/9329
2024-10-24 17:12:47 +01:00
Alex Chi Z.
fb0406e9d2 refactor(pageserver): refactor split writers using batch layer writer (#9493)
part of https://github.com/neondatabase/neon/issues/9114,
https://github.com/neondatabase/neon/issues/8836,
https://github.com/neondatabase/neon/issues/8362

The split layer writer code can be used in a more general way: the
caller puts unfinished writers into the batch layer writer and let batch
layer writer to ensure the atomicity of the layer produces.

## Summary of changes

* Add batch layer writer, which atomically finishes the layers.
`BatchLayerWriter::finish` is simply a copy-paste from previous split
layer writers.
* Refactor split writers to use the batch layer writer.
* The current split writer tests cover all code path of batch layer
writer.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-10-24 10:49:54 -04:00
Alexander Bayandin
b8a311131e CI: remove git config --add safe.directory hack (#9391)
## Problem

We have `git config --global --add safe.directory ...` leftovers from the
past, but `actions/checkout` does it by default (since v3.0.2, we use v4)

## Summary of changes
- Remove `git config --global --add safe.directory ...` hack
2024-10-24 15:49:26 +01:00
John Spray
d589498c6f storcon: respect Reconciler::cancel during await_lsn (#9486)
## Problem

When a pageserver is misbehaving (e.g. we hit an ingest bug or something
is pathologically slow), the storage controller could get stuck in the
part of live migration that waits for LSNs to catch up. This is a
problem, because it can prevent us migrating the troublesome tenant to
another pageserver.

Closes: https://github.com/neondatabase/cloud/issues/19169

## Summary of changes

- Respect Reconciler::cancel during await_lsn.
2024-10-24 15:23:09 +01:00
22 changed files with 1798 additions and 733 deletions

View File

@@ -53,20 +53,6 @@ jobs:
BUILD_TAG: ${{ inputs.build-tag }}
steps:
- name: Fix git ownership
run: |
# Workaround for `fatal: detected dubious ownership in repository at ...`
#
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
# Ref https://github.com/actions/checkout/issues/785
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16 17; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- uses: actions/checkout@v4
with:
submodules: true

View File

@@ -1078,20 +1078,6 @@ jobs:
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
steps:
- name: Fix git ownership
run: |
# Workaround for `fatal: detected dubious ownership in repository at ...`
#
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
# Ref https://github.com/actions/checkout/issues/785
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16 17; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- uses: actions/checkout@v4
- name: Trigger deploy workflow

View File

@@ -345,6 +345,7 @@ impl AuxFileV2 {
AuxFileV2::Recognized("pg_logical/replorigin_checkpoint", hash)
}
(2, 1) => AuxFileV2::Recognized("pg_replslot/", hash),
(4, 1) => AuxFileV2::Recognized("lfc.state", hash),
(1, 0xff) => AuxFileV2::OtherWithPrefix("pg_logical/", hash),
(0xff, 0xff) => AuxFileV2::Other(hash),
_ => return None,

View File

@@ -39,6 +39,7 @@ fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key
const AUX_DIR_PG_LOGICAL: u8 = 0x01;
const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
const AUX_DIR_LFC_STATE: u8 = 0x04;
const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
/// Encode the aux file into a fixed-size key.
@@ -75,6 +76,8 @@ pub fn encode_aux_file_key(path: &str) -> Key {
aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_replslot/") {
aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("lfc.state") {
aux_hash_to_metadata_key(AUX_DIR_LFC_STATE, 0x01, fname.as_bytes())
} else {
if cfg!(debug_assertions) {
warn!(

View File

@@ -1,5 +1,6 @@
//! Common traits and structs for layers
pub mod batch_split_writer;
pub mod delta_layer;
pub mod filter_iterator;
pub mod image_layer;
@@ -8,7 +9,6 @@ pub(crate) mod layer;
mod layer_desc;
mod layer_name;
pub mod merge_iterator;
pub mod split_writer;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;

View File

@@ -12,41 +12,154 @@ use super::{
DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
};
pub(crate) enum SplitWriterResult {
pub(crate) enum BatchWriterResult {
Produced(ResidentLayer),
Discarded(PersistentLayerKey),
}
#[cfg(test)]
impl SplitWriterResult {
impl BatchWriterResult {
fn into_resident_layer(self) -> ResidentLayer {
match self {
SplitWriterResult::Produced(layer) => layer,
SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
BatchWriterResult::Produced(layer) => layer,
BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
}
}
fn into_discarded_layer(self) -> PersistentLayerKey {
match self {
SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
SplitWriterResult::Discarded(layer) => layer,
BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
BatchWriterResult::Discarded(layer) => layer,
}
}
}
enum LayerWriterWrapper {
Image(ImageLayerWriter),
Delta(DeltaLayerWriter),
}
/// An layer writer that takes unfinished layers and finish them atomically.
#[must_use]
pub struct BatchLayerWriter {
generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
conf: &'static PageServerConf,
}
impl BatchLayerWriter {
pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
Ok(Self {
generated_layer_writers: Vec::new(),
conf,
})
}
pub fn add_unfinished_image_writer(
&mut self,
writer: ImageLayerWriter,
key_range: Range<Key>,
lsn: Lsn,
) {
self.generated_layer_writers.push((
LayerWriterWrapper::Image(writer),
PersistentLayerKey {
key_range,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
is_delta: false,
},
));
}
pub fn add_unfinished_delta_writer(
&mut self,
writer: DeltaLayerWriter,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
) {
self.generated_layer_writers.push((
LayerWriterWrapper::Delta(writer),
PersistentLayerKey {
key_range,
lsn_range,
is_delta: true,
},
));
}
pub(crate) async fn finish_with_discard_fn<D, F>(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
let Self {
generated_layer_writers,
..
} = self;
let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
for produced_layer in generated_layers {
if let BatchWriterResult::Produced(resident_layer) = produced_layer {
let layer: Layer = resident_layer.into();
layer.delete_on_drop();
}
}
};
// BEGIN: catch every error and do the recovery in the below section
let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
for (inner, layer_key) in generated_layer_writers {
if discard_fn(&layer_key).await {
generated_layers.push(BatchWriterResult::Discarded(layer_key));
} else {
let res = match inner {
LayerWriterWrapper::Delta(writer) => {
writer.finish(layer_key.key_range.end, ctx).await
}
LayerWriterWrapper::Image(writer) => {
writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
}
};
let layer = match res {
Ok((desc, path)) => {
match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(e);
}
}
}
Err(e) => {
// Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
// so we don't need to remove the layer we just failed to create by ourselves.
clean_up_layers(generated_layers);
return Err(e);
}
};
generated_layers.push(BatchWriterResult::Produced(layer));
}
}
// END: catch every error and do the recovery in the above section
Ok(generated_layers)
}
}
/// An image writer that takes images and produces multiple image layers.
///
/// The interface does not guarantee atomicity (i.e., if the image layer generation
/// fails, there might be leftover files to be cleaned up)
#[must_use]
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
target_layer_size: u64,
generated_layer_writers: Vec<(ImageLayerWriter, PersistentLayerKey)>,
lsn: Lsn,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn: Lsn,
batches: BatchLayerWriter,
start_key: Key,
}
@@ -71,10 +184,10 @@ impl SplitImageLayerWriter {
ctx,
)
.await?,
generated_layer_writers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
batches: BatchLayerWriter::new(conf).await?,
lsn,
start_key,
})
@@ -102,16 +215,13 @@ impl SplitImageLayerWriter {
ctx,
)
.await?;
let layer_key = PersistentLayerKey {
key_range: self.start_key..key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
};
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.batches.add_unfinished_image_writer(
prev_image_writer,
self.start_key..key,
self.lsn,
);
self.start_key = key;
self.generated_layer_writers
.push((prev_image_writer, layer_key));
}
self.inner.put_image(key, img, ctx).await
}
@@ -122,64 +232,18 @@ impl SplitImageLayerWriter {
ctx: &RequestContext,
end_key: Key,
discard_fn: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
let Self {
mut generated_layer_writers,
inner,
..
mut batches, inner, ..
} = self;
if inner.num_keys() != 0 {
let layer_key = PersistentLayerKey {
key_range: self.start_key..end_key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
};
generated_layer_writers.push((inner, layer_key));
batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
}
let clean_up_layers = |generated_layers: Vec<SplitWriterResult>| {
for produced_layer in generated_layers {
if let SplitWriterResult::Produced(image_layer) = produced_layer {
let layer: Layer = image_layer.into();
layer.delete_on_drop();
}
}
};
// BEGIN: catch every error and do the recovery in the below section
let mut generated_layers = Vec::new();
for (inner, layer_key) in generated_layer_writers {
if discard_fn(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
let layer = match inner
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
{
Ok((desc, path)) => {
match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(e);
}
}
}
Err(e) => {
// ImageLayerWriter::finish will clean up the temporary layer if anything goes wrong,
// so we don't need to remove the layer we just failed to create by ourselves.
clean_up_layers(generated_layers);
return Err(e);
}
};
generated_layers.push(SplitWriterResult::Produced(layer));
}
}
// END: catch every error and do the recovery in the above section
Ok(generated_layers)
batches.finish_with_discard_fn(tline, ctx, discard_fn).await
}
#[cfg(test)]
@@ -188,7 +252,7 @@ impl SplitImageLayerWriter {
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<SplitWriterResult>> {
) -> anyhow::Result<Vec<BatchWriterResult>> {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
.await
}
@@ -196,9 +260,6 @@ impl SplitImageLayerWriter {
/// A delta writer that takes key-lsn-values and produces multiple delta layers.
///
/// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
/// there might be leftover files to be cleaned up).
///
/// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
/// will split them into multiple files based on size.
@@ -206,12 +267,12 @@ impl SplitImageLayerWriter {
pub struct SplitDeltaLayerWriter {
inner: Option<(Key, DeltaLayerWriter)>,
target_layer_size: u64,
generated_layer_writers: Vec<(DeltaLayerWriter, PersistentLayerKey)>,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
last_key_written: Key,
batches: BatchLayerWriter,
}
impl SplitDeltaLayerWriter {
@@ -225,12 +286,12 @@ impl SplitDeltaLayerWriter {
Ok(Self {
target_layer_size,
inner: None,
generated_layer_writers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn_range,
last_key_written: Key::MIN,
batches: BatchLayerWriter::new(conf).await?,
})
}
@@ -279,13 +340,11 @@ impl SplitDeltaLayerWriter {
.await?;
let (start_key, prev_delta_writer) =
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
let layer_key = PersistentLayerKey {
key_range: start_key..key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
self.generated_layer_writers
.push((prev_delta_writer, layer_key));
self.batches.add_unfinished_delta_writer(
prev_delta_writer,
start_key..key,
self.lsn_range.clone(),
);
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
anyhow::bail!(
@@ -305,64 +364,25 @@ impl SplitDeltaLayerWriter {
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
let Self {
mut generated_layer_writers,
inner,
..
mut batches, inner, ..
} = self;
if let Some((start_key, writer)) = inner {
if writer.num_keys() != 0 {
let end_key = self.last_key_written.next();
let layer_key = PersistentLayerKey {
key_range: start_key..end_key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
generated_layer_writers.push((writer, layer_key));
batches.add_unfinished_delta_writer(
writer,
start_key..end_key,
self.lsn_range.clone(),
);
}
}
let clean_up_layers = |generated_layers: Vec<SplitWriterResult>| {
for produced_layer in generated_layers {
if let SplitWriterResult::Produced(delta_layer) = produced_layer {
let layer: Layer = delta_layer.into();
layer.delete_on_drop();
}
}
};
// BEGIN: catch every error and do the recovery in the below section
let mut generated_layers = Vec::new();
for (inner, layer_key) in generated_layer_writers {
if discard_fn(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
let layer = match inner.finish(layer_key.key_range.end, ctx).await {
Ok((desc, path)) => {
match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(e);
}
}
}
Err(e) => {
// DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
// so we don't need to remove the layer we just failed to create by ourselves.
clean_up_layers(generated_layers);
return Err(e);
}
};
generated_layers.push(SplitWriterResult::Produced(layer));
}
}
// END: catch every error and do the recovery in the above section
Ok(generated_layers)
batches.finish_with_discard_fn(tline, ctx, discard_fn).await
}
#[cfg(test)]
@@ -370,7 +390,7 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<SplitWriterResult>> {
) -> anyhow::Result<Vec<BatchWriterResult>> {
self.finish_with_discard_fn(tline, ctx, |_| async { false })
.await
}

View File

@@ -1009,7 +1009,7 @@ impl ImageLayerWriter {
self.inner.take().unwrap().finish(ctx, None).await
}
/// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
/// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
pub(super) async fn finish_with_end_key(
mut self,
end_key: Key,

View File

@@ -32,11 +32,11 @@ use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
};
use crate::tenant::storage_layer::filter_iterator::FilterIterator;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::split_writer::{
SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult,
};
use crate::tenant::storage_layer::{
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
};
@@ -2038,11 +2038,11 @@ impl Timeline {
let produced_image_layers_len = produced_image_layers.len();
for action in produced_delta_layers {
match action {
SplitWriterResult::Produced(layer) => {
BatchWriterResult::Produced(layer) => {
stat.produce_delta_layer(layer.layer_desc().file_size());
compact_to.push(layer);
}
SplitWriterResult::Discarded(l) => {
BatchWriterResult::Discarded(l) => {
keep_layers.insert(l);
stat.discard_delta_layer();
}
@@ -2050,11 +2050,11 @@ impl Timeline {
}
for action in produced_image_layers {
match action {
SplitWriterResult::Produced(layer) => {
BatchWriterResult::Produced(layer) => {
stat.produce_image_layer(layer.layer_desc().file_size());
compact_to.push(layer);
}
SplitWriterResult::Discarded(l) => {
BatchWriterResult::Discarded(l) => {
keep_layers.insert(l);
stat.discard_image_layer();
}

File diff suppressed because it is too large Load Diff

View File

@@ -32,6 +32,8 @@ DATA = \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \

View File

@@ -22,6 +22,7 @@
#include "neon_pgversioncompat.h"
#include "access/parallel.h"
#include "access/xlog.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pagestore_client.h"
@@ -30,22 +31,28 @@
#include "port/pg_iovec.h"
#include "postmaster/bgworker.h"
#include RELFILEINFO_HDR
#include "replication/message.h"
#include "storage/buf_internals.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#if PG_VERSION_NUM >= 150000
#include "access/xlogrecovery.h"
#endif
#include "hll.h"
#include "bitmap.h"
#include "neon.h"
#include "neon_perf_counters.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
/*
* Local file cache is used to temporary store relations pages in local file system.
@@ -100,7 +107,9 @@ typedef struct FileCacheEntry
BufferTag key;
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 access_count : 30;
uint32 prewarm_requested : 1; /* entry should be filled by prewarm */
uint32 prewarm_started : 1; /* chunk is written by lfc_prewarm */
uint32 bitmap[CHUNK_BITMAP_SIZE];
dlist_node list_node; /* LRU/holes list node */
} FileCacheEntry;
@@ -118,26 +127,57 @@ typedef struct FileCacheControl
uint64 writes; /* number of writes issued */
uint64 time_read; /* time spent reading (us) */
uint64 time_write; /* time spent writing (us) */
uint32 prewarm_total_chunks;
uint32 prewarm_curr_chunk;
uint32 prewarmed_pages;
uint32 skipped_pages;
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
} FileCacheControl;
typedef struct FileCacheStateEntry
{
BufferTag key;
uint32 bitmap[CHUNK_BITMAP_SIZE];
} FileCacheStateEntry;
static HTAB *lfc_hash;
static int lfc_desc = 0;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;
static char *lfc_path;
static FileCacheControl *lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static CustomCheckpointHookType PrevCheckpointHook;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
PGDLLEXPORT void LfcPrewarmMain(Datum main_arg);
static void
LfcCheckpointHook(int flags)
{
if (flags & CHECKPOINT_IS_SHUTDOWN)
{
lfc_save_state();
}
if (PrevCheckpointHook)
{
PrevCheckpointHook(flags);
}
}
/*
* Local file cache is optional and Neon can work without it.
* In case of any any errors with this cache, we should disable it but to not throw error.
@@ -149,7 +189,7 @@ lfc_disable(char const *op)
{
int fd;
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -184,7 +224,7 @@ lfc_disable(char const *op)
pgstat_report_wait_end();
if (rc < 0)
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to truncate local file cache %s: %m", lfc_path);
}
}
@@ -196,7 +236,7 @@ lfc_disable(char const *op)
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
else
close(fd);
@@ -236,6 +276,17 @@ lfc_ensure_opened(void)
return enabled;
}
PGDLLEXPORT void
LfcPrewarmMain(Datum main_arg)
{
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
lfc_load_pages();
}
static void
lfc_shmem_startup(void)
{
@@ -267,14 +318,7 @@ lfc_shmem_startup(void)
n_chunks + 1, n_chunks + 1,
&info,
HASH_ELEM | HASH_BLOBS);
lfc_ctl->generation = 0;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
lfc_ctl->hits = 0;
lfc_ctl->misses = 0;
lfc_ctl->writes = 0;
lfc_ctl->time_read = 0;
lfc_ctl->time_write = 0;
memset(lfc_ctl, 0, sizeof *lfc_ctl);
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
@@ -285,7 +329,7 @@ lfc_shmem_startup(void)
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
{
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
lfc_ctl->limit = 0;
}
else
@@ -295,6 +339,9 @@ lfc_shmem_startup(void)
}
}
LWLockRelease(AddinShmemInitLock);
PrevCheckpointHook = CustomCheckpointHook;
CustomCheckpointHook = LfcCheckpointHook;
}
static void
@@ -327,7 +374,7 @@ lfc_check_limit_hook(int *newval, void **extra, GucSource source)
{
if (*newval > lfc_max_size)
{
elog(ERROR, "neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
elog(ERROR, "LFC: neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
return false;
}
return true;
@@ -436,6 +483,32 @@ lfc_init(void)
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
"Maximal number of prewarmed pages",
NULL,
&lfc_prewarm_limit,
0, /* disabled by default */
0,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
"Number of pages retrivied by prewarm from page server",
NULL,
&lfc_prewarm_batch,
64,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
@@ -447,8 +520,326 @@ lfc_init(void)
#else
lfc_shmem_request();
#endif
if (lfc_prewarm_limit != 0)
{
BackgroundWorker bgw;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_ConsistentState;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LfcPrewarmMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "LFC prewarm");
snprintf(bgw.bgw_type, BGW_MAXLEN, "LFC prewarm");
RegisterBackgroundWorker(&bgw);
}
}
static FileCacheStateEntry*
lfc_get_state(size_t* n_entries)
{
size_t max_entries = *n_entries;
size_t i = 0;
FileCacheStateEntry* fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
dlist_iter iter;
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
memcpy(&fs[i].key, &entry->key, sizeof entry->key);
memcpy(fs[i].bitmap, entry->bitmap, sizeof entry->bitmap);
if (++i == max_entries)
break;
}
elog(LOG, "LFC: save state of %ld chunks", (long)i);
}
LWLockRelease(lfc_lock);
*n_entries = i;
return fs;
}
/*
* Save state of local file cache as AUX file. Size of saved state is limited by lfc_prewarm_limit.
* This function saves first mostrecently used pages.
* It is expected to be called at shutdown checkpoint by checkpointer.
*/
void
lfc_save_state(void)
{
size_t n_entries = lfc_prewarm_limit;
FileCacheStateEntry* fs;
if (n_entries == 0)
return;
fs = lfc_get_state(&n_entries);
if (n_entries != 0)
{
#if PG_MAJORVERSION_NUM < 17
XLogFlush(LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false));
#else
LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false, true);
#endif
}
pfree(fs);
}
/*
* Prewarm LFC cache to the specified state.
*
* Prewarming can interfere with accesses to the pages by other backends. Usually access to LFC is protected by shared buffers: when Postgres
* is reading page, it pins shared buffer and enforces that only one backend is reading it, while other are waiting for read completion.
*
* But it is not true for prewarming: backend can fetch page itself, modify and then write it to LFC. At the
* same time `lfc_prewarm` tries to write deteriorated image of this page in LFC. To increase concurrency, access to LFC files (both read and write)
* is performed without holding locks. So it can happen that two or more processes write different content to the same location in the LFC file.
* Certainly we can not rely on disk content in this case.
*
* To solve this problem we use two flags in LFC entry: `prewarm_requested` and `prewarm_started`. First is set before prewarm is actually started.
* `lfc_prewarm` writes to LFC file only if this flag is set. This flag is cleared if any other backend performs write to this LFC chunk.
* In this case data loaded by `lfc_prewarm` is considered to be deteriorated and should be just ignored.
*
* But as far as write to LFC is performed without holding lock, there is no guarantee that no such write is in progress.
* This is why second flag is used: `prewarm_started`. It is set by `lfc_prewarm` when is starts writing page and cleared when write is completed.
* Any other backend writing to LFC should abandon it's write to LFC file (just not mark page as loaded in bitmap) if this flag is set.
* So neither `lfc_prewarm`, neither backend are saving page in LFC in this case - it is just skipped.
*/
static void
lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
{
ssize_t rc;
size_t snd_idx = 0, rcv_idx = 0;
size_t n_sent = 0, n_received = 0;
FileCacheEntry *entry;
uint64 generation;
uint32 entry_offset;
uint32 hash;
size_t i;
bool found;
int shard_no;
if (!lfc_ensure_opened())
return;
if (n_entries == 0 || fs == NULL)
{
elog(LOG, "LFC: prewarm is disabled");
return;
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
if (lfc_ctl->limit <= lfc_ctl->size)
{
LWLockRelease(lfc_lock);
return;
}
if (n_entries > lfc_ctl->limit - lfc_ctl->size)
{
n_entries = lfc_ctl->limit - lfc_ctl->size;
}
/* Initialize fields used to track prewarming progress */
lfc_ctl->prewarm_total_chunks = n_entries;
lfc_ctl->prewarm_curr_chunk = 0;
/*
* Load LFC state and add entries in hash table.
* It is needed to track modification of prewarmed pages.
* All such entries have `prewarm_requested` flag set. When entry is updated (some backed reads or writes
* some pages from this chunk), then `prewarm_requested` flag is cleared, prohibiting prewarm of this chunk.
* It prevents overwritting page updated or loaded by backend with older one, loaded by prewarm.
*/
for (i = 0; i < n_entries; i++)
{
hash = get_hash_value(lfc_hash, &fs[i].key);
entry = hash_search_with_hash_value(lfc_hash, &fs[i].key, hash, HASH_ENTER, &found);
/* Do not prewarm chunks which are already present in LFC */
if (!found)
{
entry->offset = lfc_ctl->size++;
entry->hash = hash;
entry->access_count = 0;
entry->prewarm_requested = true;
entry->prewarm_started = false;
memset(entry->bitmap, 0, sizeof entry->bitmap);
/* Most recently visted pages are stored first */
dlist_push_head(&lfc_ctl->lru, &entry->list_node);
lfc_ctl->used += 1;
}
}
LWLockRelease(lfc_lock);
elog(LOG, "LFC: start loading %ld chunks", (long)n_entries);
while (true)
{
size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK;
size_t offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK;
if (chunk_no < n_entries)
{
if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))
{
/*
* In case of prewarming replica we should be careful not to load too new version
* of the page - with LSN larger than current replay LSN.
* At primary we are always loading latest version.
*/
XLogRecPtr req_lsn = RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : UINT64_MAX;
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
/* lsn and not_modified_since are filled in below */
.rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key),
.forknum = fs[chunk_no].key.forkNum,
.blkno = fs[chunk_no].key.blockNum + offs_in_chunk,
.req.lsn = req_lsn,
.req.not_modified_since = 0
};
shard_no = get_shard_number(&fs[chunk_no].key);
while (!page_server->send(shard_no, (NeonRequest *) &request)
|| !page_server->flush(shard_no))
{
/* do nothing */
}
n_sent += 1;
}
snd_idx += 1;
}
if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries)
{
NeonResponse * resp;
do
{
chunk_no = rcv_idx / BLOCKS_PER_CHUNK;
offs_in_chunk = rcv_idx % BLOCKS_PER_CHUNK;
rcv_idx += 1;
} while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))));
shard_no = get_shard_number(&fs[chunk_no].key);
resp = page_server->receive(shard_no);
lfc_ctl->prewarm_curr_chunk = chunk_no;
if (resp->tag != T_NeonGetPageResponse)
{
elog(LOG, "LFC: unexpected response type: %d", resp->tag);
return;
}
hash = get_hash_value(lfc_hash, &fs[chunk_no].key);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
entry = hash_search_with_hash_value(lfc_hash, &fs[chunk_no].key, hash, HASH_FIND, NULL);
if (entry != NULL && entry->prewarm_requested)
{
/* Unlink entry from LRU list to pin it for the duration of IO operation */
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
generation = lfc_ctl->generation;
entry_offset = entry->offset;
Assert(!entry->prewarm_started);
entry->prewarm_started = true;
LWLockRelease(lfc_lock);
rc = pwrite(lfc_desc, ((NeonGetPageResponse*)resp)->page, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + offs_in_chunk) * BLCKSZ);
if (rc != BLCKSZ)
{
lfc_disable("write");
break;
}
else
{
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation)
{
CriticalAssert(LFC_ENABLED());
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
if (entry->prewarm_requested)
{
lfc_ctl->used_pages += 1 - ((entry->bitmap[offs_in_chunk >> 5] >> (offs_in_chunk & 31)) & 1);
entry->bitmap[offs_in_chunk >> 5] |= 1 << (offs_in_chunk & 31);
lfc_ctl->prewarmed_pages += 1;
}
else
{
lfc_ctl->skipped_pages += 1;
}
Assert(entry->prewarm_started);
entry->prewarm_started = false;
}
LWLockRelease(lfc_lock);
}
}
else
{
Assert(!entry || !entry->prewarm_started);
lfc_ctl->skipped_pages += 1;
LWLockRelease(lfc_lock);
}
if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK)
{
break;
}
}
}
Assert(n_sent == n_received);
lfc_ctl->prewarm_curr_chunk = n_entries;
elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received);
}
/*
* Load pages from LFC state saved in AUX file.
*/
void
lfc_load_pages(void)
{
int fd;
FileCacheStateEntry *fs;
ssize_t rc;
size_t max_entries = lfc_prewarm_limit;
fd = OpenTransientFile("lfc.state", O_RDONLY | PG_BINARY);
if (fd < 0)
{
elog(LOG, "LFC: state file is missing");
return;
}
fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
rc = read(fd, fs, sizeof(FileCacheStateEntry) * max_entries);
if (rc <= 0)
{
elog(LOG, "LFC: Failed to read state file: %m");
CloseTransientFile(fd);
}
else
{
CloseTransientFile(fd);
elog(LOG, "LFC: read state with %lu entries", (long)(rc / sizeof(FileCacheStateEntry)));
lfc_prewarm(fs, rc / sizeof(FileCacheStateEntry));
}
pfree(fs);
}
/*
* Check if page is present in the cache.
* Returns true if page is found in local cache.
@@ -616,6 +1007,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
/* remove the page from the cache */
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
entry->prewarm_requested = false; /* prohibit prewarm of this LFC entry */
if (entry->access_count == 0)
{
@@ -861,7 +1253,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
/*
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
* 2. Check if the chunk actually has the blocks we're interested in
@@ -899,6 +1291,17 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (found)
{
if (entry->prewarm_started)
{
/*
* Some page of this chunk is currently written by `lfc_prewarm`.
* We should give-up not to interfere with it.
* But clearing `prewarm_requested` flag also will not allow `lfc_prewarm` to fix it result.
*/
entry->prewarm_requested = false;
LWLockRelease(lfc_lock);
return;
}
/*
* Unlink entry from LRU list to pin it for the duration of IO
* operation
@@ -928,7 +1331,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru));
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1;
@@ -944,10 +1347,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool hole_found;
hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &hole_found);
CriticalAssert(hole_found);
lfc_ctl->used += 1;
entry->offset = offset; /* reuse the hole */
}
@@ -959,9 +1362,11 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
entry->access_count = 1;
entry->hash = hash;
entry->prewarm_started = false;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
entry->prewarm_requested = false; /* prohibit prewarm if LFC entry is updated by some backend */
generation = lfc_ctl->generation;
entry_offset = entry->offset;
LWLockRelease(lfc_lock);
@@ -1334,3 +1739,74 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(save_local_cache_state);
Datum
save_local_cache_state(PG_FUNCTION_ARGS)
{
lfc_save_state();
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_local_cache_state);
Datum
get_local_cache_state(PG_FUNCTION_ARGS)
{
size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
FileCacheStateEntry* fs = lfc_get_state(&n_entries);
size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries;
bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes);
SET_VARSIZE(res, VARHDRSZ + size_in_bytes);
memcpy(VARDATA(res), fs, size_in_bytes);
pfree(fs);
PG_RETURN_BYTEA_P(res);
}
PG_FUNCTION_INFO_V1(prewarm_local_cache);
Datum
prewarm_local_cache(PG_FUNCTION_ARGS)
{
bytea* state = PG_GETARG_BYTEA_PP(0);
uint32 n_entries = VARSIZE_ANY_EXHDR(state);
FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state);
lfc_prewarm(fs, n_entries);
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_prewarm_info);
Datum
get_prewarm_info(PG_FUNCTION_ARGS)
{
Datum values[4];
bool nulls[4];
TupleDesc tupdesc;
if (lfc_size_limit == 0)
PG_RETURN_NULL();
tupdesc = CreateTemplateTupleDesc(4);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prewarmed_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "skipped_pages", INT4OID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);
MemSet(nulls, 0, sizeof(nulls));
LWLockAcquire(lfc_lock, LW_SHARED);
values[0] = Int32GetDatum(lfc_ctl->prewarm_total_chunks);
values[1] = Int32GetDatum(lfc_ctl->prewarm_curr_chunk);
values[2] = Int32GetDatum(lfc_ctl->prewarmed_pages);
values[3] = Int32GetDatum(lfc_ctl->skipped_pages);
LWLockRelease(lfc_lock);
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -0,0 +1,28 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit
CREATE FUNCTION save_local_cache_state()
RETURNS void
AS 'MODULE_PATHNAME', 'save_local_cache_state'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer)
RETURNS record
AS 'MODULE_PATHNAME', 'get_prewarm_info'
LANGUAGE C STRICT
PARALLEL SAFE;
CREATE FUNCTION get_local_cache_state(max_chunks integer default null)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_local_cache_state'
LANGUAGE C
PARALLEL UNSAFE;
CREATE FUNCTION prewarm_local_cache(state bytea)
RETURNS void
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
LANGUAGE C STRICT
PARALLEL UNSAFE;

View File

@@ -0,0 +1,9 @@
DROP FUNCTION IF EXISTS save_local_cache_state();
DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer);
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea);

View File

@@ -276,6 +276,8 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);
extern void lfc_save_state(void);
extern void lfc_load_pages(void);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

View File

@@ -450,6 +450,9 @@ impl Reconciler {
}
}
/// This function does _not_ mutate any state, so it is cancellation safe.
///
/// This function does not respect [`Self::cancel`], callers should handle that.
async fn await_lsn(
&self,
tenant_shard_id: TenantShardId,
@@ -570,8 +573,10 @@ impl Reconciler {
if let Some(baseline) = baseline_lsns {
tracing::info!("🕑 Waiting for LSN to catch up...");
self.await_lsn(self.tenant_shard_id, &dest_ps, baseline)
.await?;
tokio::select! {
r = self.await_lsn(self.tenant_shard_id, &dest_ps, baseline) => {r?;}
_ = self.cancel.cancelled() => {return Err(ReconcileError::Cancel)}
};
}
tracing::info!("🔁 Notifying compute to use pageserver {dest_ps}");

View File

@@ -0,0 +1,52 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
def test_lfc_prewarm(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 1000000
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
endpoint.stop()
endpoint.start()
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
for _ in range(60):
time.sleep(1) # give prewarm BGW some time to proceed
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
cur.execute("select * from get_prewarm_info()")
prewarm_info = cur.fetchall()[0]
log.info(f"Prewarm info: {prewarm_info}")
if prewarm_info[0] > 0:
log.info(f"Prewarm progress: {prewarm_info[1]*100//prewarm_info[0]}%")
if prewarm_info[0] == prewarm_info[1]:
break
assert lfc_used_pages > 10000
assert prewarm_info[0] > 0 and prewarm_info[0] == prewarm_info[1]
cur.execute("select sum(pk) from t")
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
assert prewarm_info[1] > 0

View File

@@ -435,7 +435,9 @@ def test_emergency_relocate_with_branches_slow_replay(
# This fail point will pause the WAL ingestion on the main branch, after the
# the first insert
pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")])
pageserver_http.configure_failpoints(
[("pageserver-wal-ingest-logical-message-sleep", "return(5000)")]
)
# Attach and wait a few seconds to give it time to load the tenants, attach to the
# safekeepers, and to stream and ingest the WAL up to the pause-point.
@@ -453,11 +455,13 @@ def test_emergency_relocate_with_branches_slow_replay(
assert cur.fetchall() == [("before pause",), ("after pause",)]
# Sanity check that the failpoint was reached
env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done')
env.pageserver.assert_log_contains(
'failpoint "pageserver-wal-ingest-logical-message-sleep": sleep done'
)
assert time.time() - before_attach_time > 5
# Clean up
pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off"))
pageserver_http.configure_failpoints(("pageserver-wal-ingest-logical-message-sleep", "off"))
# Simulate hard crash of pageserver and re-attach a tenant with a branch
@@ -581,7 +585,9 @@ def test_emergency_relocate_with_branches_createdb(
# bug reproduced easily even without this, as there is always some delay between
# loading the timeline and establishing the connection to the safekeeper to stream and
# ingest the WAL, but let's make this less dependent on accidental timing.
pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")])
pageserver_http.configure_failpoints(
[("pageserver-wal-ingest-logical-message-sleep", "return(5000)")]
)
before_attach_time = time.time()
env.pageserver.tenant_attach(tenant_id)
@@ -590,8 +596,10 @@ def test_emergency_relocate_with_branches_createdb(
assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200
# Sanity check that the failpoint was reached
env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done')
env.pageserver.assert_log_contains(
'failpoint "pageserver-wal-ingest-logical-message-sleep": sleep done'
)
assert time.time() - before_attach_time > 5
# Clean up
pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off"))
pageserver_http.configure_failpoints(("pageserver-wal-ingest-logical-message-sleep", "off"))

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.0",
"68b5038f27e493bde6ae552fe066f10cbdfe6a14"
"37d5ead146b028dd9a5c07e7a37068ec0df9f465"
],
"v16": [
"16.4",
"e131a9c027b202ce92bd7b9cf2569d48a6f9948e"
"cc36e03bd0c927022cf3b3563e291e42d75366a1"
],
"v15": [
"15.8",
"22e580fe9ffcea7e02592110b1c9bf426d83cada"
"a4830163a65811578824ce4022c1cd3daef33d4e"
],
"v14": [
"14.13",
"2199b83fb72680001ce0f43bf6187a21dfb8f45d"
"ecb1020ff71927e9dd59c526254bb8846bb73ee1"
]
}