Compare commits

...

6 Commits

Author SHA1 Message Date
Konstantin Knizhnik
aaf8617fcc Log detection of duplicate layer 2023-05-16 09:53:45 +03:00
Konstantin Knizhnik
b14e70fb5c Make clippy happy 2023-05-15 22:59:15 +03:00
Konstantin Knizhnik
64e01c5b87 Add removed BufferedHistoricLayerCoverage.contains method 2023-05-15 18:24:35 +03:00
Konstantin Knizhnik
843e82357f Fix rust formatting issues 2023-05-15 17:34:30 +03:00
Konstantin Knizhnik
51256890a0 Add comment explainign potential risks about duplicates prevention 2023-05-15 17:25:00 +03:00
Heikki Linnakangas
6837356cc1 Fix the duplicate key problem during compaction.
Before finishing the delta file, and possibly overwriting an old
perfectly valid file, check if an identical file already exists in the
layer map.

This is an alternative for https://github.com/neondatabase/neon/pull/4094.
Test case is copied from that PR.
2023-05-15 16:58:08 +03:00
5 changed files with 129 additions and 1 deletions

View File

@@ -285,6 +285,11 @@ where
}
}
pub fn contains(&self, key_range: &Range<Key>, lsn_range: &Range<Lsn>, is_image: bool) -> bool {
let key = historic_layer_coverage::LayerKey::from_ranges(key_range, lsn_range, is_image);
self.historic.contains(&key)
}
///
/// Remove an on-disk layer from the map.
///

View File

@@ -53,6 +53,20 @@ impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerK
}
}
impl LayerKey {
pub fn from_ranges(
kr: &Range<crate::tenant::layer_map::Key>,
lr: &Range<utils::lsn::Lsn>,
is_image: bool,
) -> Self {
LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image,
}
}
}
/// Efficiently queryable layer coverage for each LSN.
///
/// Allows answering layer map queries very efficiently,
@@ -417,6 +431,14 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
}
}
pub fn contains(&self, layer_key: &LayerKey) -> bool {
match self.buffer.get(layer_key) {
Some(None) => false, // layer remove was buffered
Some(_) => true, // layer insert was buffered
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
}
}
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
self.buffer.insert(layer_key, Some(value));
}

View File

@@ -867,6 +867,14 @@ pub struct DeltaLayerWriter {
}
impl DeltaLayerWriter {
pub fn key_start(&self) -> Key {
self.inner.as_ref().unwrap().key_start
}
pub fn lsn_range(&self) -> Range<Lsn> {
self.inner.as_ref().unwrap().lsn_range.clone()
}
///
/// Start building a new delta layer.
///

View File

@@ -3375,7 +3375,54 @@ impl Timeline {
|| contains_hole
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
let end_key = prev_key.unwrap().next();
let w = writer.take().unwrap();
// If an identical L1 layer already exists, no need to create a new one.
//
// This can happen if compaction is interrupted after it has already
// created some or all of the L1 layers, but has not deleted the L0 layers
// yet, so that on next compaction, we do the same work again.
//
// NOTE: this is racy, if there can be any other task that concurrently
// creates L1 layers. Currently, there can be only one compaction task
// running at any time, so this is fine.
//
// Also we hold `layer_removal_cs` guard which should prevent race condition
// even if there are two or more concurrent compaction tasks.
//
// But there is an opposite issue: we check presence of duplicates under
// `layers` shared lock, but then it is released. So there is a gap between
// this check and adding new layer to layer map. In principle in this gap some
// some other task (i.e. GC) can drop this layer and we already abandon insertion
// of duplicate layer. As a result there will be no such layer at all.
// In other words: we have some state S1 of pageserver where layer L1 can be removed by GC.
// Then we run compaction and it switch pageserver to the state S2 which writes duplicate of
// layer L1 and where it can not be removed. With this patch it is possible that
// we switch pageserver to state S2 but... with L1 lost.
// It is just hypothetical situation and there is no such concrete scenario which
// reproduces this problem. So let's take this risk.
//
if self.layers.read().unwrap().contains(
&(w.key_start()..end_key),
&w.lsn_range(),
false, // not an image layer
) {
info!(
"Skip generation of duplicate layer {}_{}__{}_{}",
w.key_start(),
end_key,
w.lsn_range().start,
w.lsn_range().end
);
drop(w);
} else {
let new_layer = w.finish(end_key)?;
new_layers.push(new_layer);
}
writer = None;
if contains_hole {
@@ -3432,6 +3479,10 @@ impl Timeline {
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
fail_point!("compact-level0-phase1-finish", |_| {
Err(anyhow::anyhow!("failpoint compact-level0-phase1-finish").into())
});
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,

View File

@@ -0,0 +1,42 @@
import time
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
# Test duplicate layer detection
#
# This test sets fail point at the end of first compaction phase:
# after flushing new L1 layers but before deletion of L0 layes
# It should cause generation of duplicate L1 layer by compaction after restart
@pytest.mark.timeout(600)
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
# These warnings are expected, when the pageserver is restarted abruptly
env.pageserver.allowed_errors.append(".*found future image layer.*")
env.pageserver.allowed_errors.append(".*found future delta layer.*")
env.pageserver.allowed_errors.append(".*duplicate layer.*")
pageserver_http = env.pageserver.http_client()
# Use aggressive compaction and checkpoint settings
tenant_id, _ = env.neon_cli.create_tenant(
conf={
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
"compaction_period": "1 s",
"compaction_threshold": "3",
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
connstr = endpoint.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
pageserver_http.configure_failpoints(("compact-level0-phase1-finish", "exit"))
with pytest.raises(Exception):
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])
env.pageserver.stop()
env.pageserver.start()
time.sleep(10) # let compaction to be performed