Fix the duplicate key problem during compaction.

Before finishing the delta file, and possibly overwriting an old
perfectly valid file, check if an identical file already exists in the
layer map.

This is an alternative for https://github.com/neondatabase/neon/pull/4094.
Test case is copied from that PR.
This commit is contained in:
Heikki Linnakangas
2023-04-28 15:44:42 +03:00
committed by Konstantin Knizhnik
parent a5615bd8ea
commit 6837356cc1
5 changed files with 97 additions and 1 deletions

View File

@@ -285,6 +285,11 @@ where
}
}
pub fn contains(&self, key_range: &Range<Key>, lsn_range: &Range<Lsn>, is_image: bool) -> bool {
let key = historic_layer_coverage::LayerKey::from_ranges(key_range, lsn_range, is_image);
self.historic.contains(&key)
}
///
/// Remove an on-disk layer from the map.
///

View File

@@ -53,6 +53,20 @@ impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerK
}
}
impl LayerKey {
pub fn from_ranges(
kr: &Range<crate::tenant::layer_map::Key>,
lr: &Range<utils::lsn::Lsn>,
is_image: bool,
) -> Self {
LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image,
}
}
}
/// Efficiently queryable layer coverage for each LSN.
///
/// Allows answering layer map queries very efficiently,

View File

@@ -867,6 +867,13 @@ pub struct DeltaLayerWriter {
}
impl DeltaLayerWriter {
pub fn key_start(&self) -> Key {
self.inner.as_ref().unwrap().key_start.clone()
}
pub fn lsn_range(&self) -> Range<Lsn> {
self.inner.as_ref().unwrap().lsn_range.clone()
}
///
/// Start building a new delta layer.
///

View File

@@ -3375,7 +3375,31 @@ impl Timeline {
|| contains_hole
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
let end_key = prev_key.unwrap().next();
let w = writer.take().unwrap();
// If an identical L1 layer already exists, no need to create a new one.
//
// This can happen if compaction is interrupted after it has already
// created some or all of the L1 layers, but has not deleted the L0 layers
// yet, so that on next compaction, we do the same work again.
//
// NOTE: this is racy, if there can be any other task that concurrently
// creates L1 layers. Currently, there can be only one compaction task
// running at any time, so this is fine.
if self.layers.read().unwrap().contains(
&(w.key_start()..end_key),
&w.lsn_range(),
false, // not an image layer
) {
drop(w);
} else {
let new_layer = w.finish(end_key)?;
new_layers.push(new_layer);
}
writer = None;
if contains_hole {
@@ -3432,6 +3456,10 @@ impl Timeline {
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
fail_point!("compact-level0-phase1-finish", |_| {
Err(anyhow::anyhow!("failpoint compact-level0-phase1-finish").into())
});
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,

View File

@@ -0,0 +1,42 @@
import time
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
# Test duplicate layer detection
#
# This test sets fail point at the end of first compaction phase:
# after flushing new L1 layers but before deletion of L0 layes
# It should cause generation of duplicate L1 layer by compaction after restart
@pytest.mark.timeout(600)
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
# These warnings are expected, when the pageserver is restarted abruptly
env.pageserver.allowed_errors.append(".*found future image layer.*")
env.pageserver.allowed_errors.append(".*found future delta layer.*")
env.pageserver.allowed_errors.append(".*duplicate layer.*")
pageserver_http = env.pageserver.http_client()
# Use aggressive compaction and checkpoint settings
tenant_id, _ = env.neon_cli.create_tenant(
conf={
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
"compaction_period": "1 s",
"compaction_threshold": "3",
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
connstr = endpoint.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
pageserver_http.configure_failpoints(("compact-level0-phase1-finish", "exit"))
with pytest.raises(Exception):
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])
env.pageserver.stop()
env.pageserver.start()
time.sleep(10) # let compaction to be performed