mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
Persists latest_gc_cutoff_lsn before performing GC (#2558)
* Persists latest_gc_cutoff_lsn before performing GC * Peform some refactoring and code deduplication refer #2539 * Add test for persisting GC cutoff * Fix python test style warnings * Bump postgres version * Reduce number of iterations in test_gc_cutoff test * Bump postgres version * Undo bumping postgres version
This commit is contained in:
committed by
GitHub
parent
c67cf34040
commit
91411c415a
@@ -1220,78 +1220,76 @@ impl Timeline {
|
||||
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
|
||||
// *all* the layers, to avoid fsyncing the file multiple times.
|
||||
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
self.update_disk_consistent_lsn(disk_consistent_lsn, layer_paths_to_upload)?;
|
||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
|
||||
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
||||
// After crash, we will restart WAL streaming and processing from that point.
|
||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
||||
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)?;
|
||||
// Also update the in-memory copy
|
||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update metadata file
|
||||
fn update_disk_consistent_lsn(
|
||||
fn update_metadata_file(
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layer_paths_to_upload: HashMap<PathBuf, LayerFileMetadata>,
|
||||
) -> Result<()> {
|
||||
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
||||
// After crash, we will restart WAL streaming and processing from that point.
|
||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
||||
// We can only save a valid 'prev_record_lsn' value on disk if we
|
||||
// flushed *all* in-memory changes to disk. We only track
|
||||
// 'prev_record_lsn' in memory for the latest processed record, so we
|
||||
// don't remember what the correct value that corresponds to some old
|
||||
// LSN is. But if we flush everything, then the value corresponding
|
||||
// current 'last_record_lsn' is correct and we can store it on disk.
|
||||
let RecordLsn {
|
||||
last: last_record_lsn,
|
||||
prev: prev_record_lsn,
|
||||
} = self.last_record_lsn.load();
|
||||
let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
|
||||
Some(prev_record_lsn)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// We can only save a valid 'prev_record_lsn' value on disk if we
|
||||
// flushed *all* in-memory changes to disk. We only track
|
||||
// 'prev_record_lsn' in memory for the latest processed record, so we
|
||||
// don't remember what the correct value that corresponds to some old
|
||||
// LSN is. But if we flush everything, then the value corresponding
|
||||
// current 'last_record_lsn' is correct and we can store it on disk.
|
||||
let RecordLsn {
|
||||
last: last_record_lsn,
|
||||
prev: prev_record_lsn,
|
||||
} = self.last_record_lsn.load();
|
||||
let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
|
||||
Some(prev_record_lsn)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let ancestor_timeline_id = self
|
||||
.ancestor_timeline
|
||||
.as_ref()
|
||||
.map(|ancestor| ancestor.timeline_id);
|
||||
|
||||
let ancestor_timeline_id = self
|
||||
.ancestor_timeline
|
||||
.as_ref()
|
||||
.map(|ancestor| ancestor.timeline_id);
|
||||
let metadata = TimelineMetadata::new(
|
||||
disk_consistent_lsn,
|
||||
ondisk_prev_record_lsn,
|
||||
ancestor_timeline_id,
|
||||
self.ancestor_lsn,
|
||||
*self.latest_gc_cutoff_lsn.read(),
|
||||
self.initdb_lsn,
|
||||
self.pg_version,
|
||||
);
|
||||
|
||||
let metadata = TimelineMetadata::new(
|
||||
disk_consistent_lsn,
|
||||
ondisk_prev_record_lsn,
|
||||
ancestor_timeline_id,
|
||||
self.ancestor_lsn,
|
||||
*self.latest_gc_cutoff_lsn.read(),
|
||||
self.initdb_lsn,
|
||||
self.pg_version,
|
||||
);
|
||||
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
|
||||
"{}",
|
||||
x.unwrap()
|
||||
));
|
||||
|
||||
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
|
||||
"{}",
|
||||
x.unwrap()
|
||||
));
|
||||
save_metadata(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
&metadata,
|
||||
false,
|
||||
)?;
|
||||
|
||||
save_metadata(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
&metadata,
|
||||
false,
|
||||
)?;
|
||||
|
||||
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
layer_paths_to_upload,
|
||||
Some(metadata),
|
||||
);
|
||||
}
|
||||
|
||||
// Also update the in-memory copy
|
||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||
self.timeline_id,
|
||||
layer_paths_to_upload,
|
||||
Some(metadata),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1961,6 +1959,9 @@ impl Timeline {
|
||||
new_gc_cutoff
|
||||
);
|
||||
write_guard.store_and_unlock(new_gc_cutoff).wait();
|
||||
|
||||
// Persist metadata file
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), HashSet::new())?;
|
||||
}
|
||||
|
||||
info!("GC starting");
|
||||
@@ -2087,6 +2088,18 @@ impl Timeline {
|
||||
result.layers_removed += 1;
|
||||
}
|
||||
|
||||
info!(
|
||||
"GC completed removing {} layers, cuttof {}",
|
||||
result.layers_removed, new_gc_cutoff
|
||||
);
|
||||
if result.layers_removed != 0 {
|
||||
fail_point!("gc-before-save-metadata", |_| {
|
||||
info!("Abnormaly terinate pageserver at gc-before-save-metadata fail point");
|
||||
std::process::abort();
|
||||
});
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
||||
storage_sync::schedule_layer_delete(
|
||||
self.tenant_id,
|
||||
|
||||
@@ -70,8 +70,10 @@ async fn compaction_loop(tenant_id: TenantId) {
|
||||
// Run compaction
|
||||
let mut sleep_duration = tenant.get_compaction_period();
|
||||
if let Err(e) = tenant.compaction_iteration() {
|
||||
error!("Compaction failed, retrying: {e:#}");
|
||||
sleep_duration = wait_duration;
|
||||
error!("Compaction failed, retrying in {:?}: {e:#}", sleep_duration);
|
||||
#[cfg(feature = "testing")]
|
||||
std::process::abort();
|
||||
}
|
||||
|
||||
// Sleep
|
||||
@@ -119,8 +121,10 @@ async fn gc_loop(tenant_id: TenantId) {
|
||||
if gc_horizon > 0 {
|
||||
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false)
|
||||
{
|
||||
error!("Gc failed, retrying: {e:#}");
|
||||
sleep_duration = wait_duration;
|
||||
error!("Gc failed, retrying in {:?}: {e:#}", sleep_duration);
|
||||
#[cfg(feature = "testing")]
|
||||
std::process::abort();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
38
test_runner/regress/test_gc_cutoff.py
Normal file
38
test_runner/regress/test_gc_cutoff.py
Normal file
@@ -0,0 +1,38 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
|
||||
|
||||
# Test gc_cuttoff
|
||||
#
|
||||
# This test set fail point after at the end of GC and checks
|
||||
# that pageserver normally restarts after it
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix(10))
|
||||
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, scale: int):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# Use aggressive GC and checkpoint settings, so that we also exercise GC during the test
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"gc_period": "10 s",
|
||||
"gc_horizon": f"{1024 ** 2}",
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "1 s",
|
||||
}
|
||||
)
|
||||
pg = env.postgres.create_start("main", tenant_id=tenant_id)
|
||||
connstr = pg.connstr()
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
|
||||
pageserver_http.configure_failpoints(("gc-before-save-metadata", "return"))
|
||||
|
||||
for i in range(5):
|
||||
try:
|
||||
pg_bin.run_capture(["pgbench", "-T100", connstr])
|
||||
except Exception:
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
pageserver_http.configure_failpoints(("gc-before-save-metadata", "return"))
|
||||
Reference in New Issue
Block a user