mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 17:40:37 +00:00
Persists latest_gc_cutoff_lsn before performing GC (#2558)
* Persists latest_gc_cutoff_lsn before performing GC * Peform some refactoring and code deduplication refer #2539 * Add test for persisting GC cutoff * Fix python test style warnings * Bump postgres version * Reduce number of iterations in test_gc_cutoff test * Bump postgres version * Undo bumping postgres version
This commit is contained in:
committed by
GitHub
parent
c67cf34040
commit
91411c415a
@@ -1220,78 +1220,76 @@ impl Timeline {
|
|||||||
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
|
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
|
||||||
// *all* the layers, to avoid fsyncing the file multiple times.
|
// *all* the layers, to avoid fsyncing the file multiple times.
|
||||||
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
||||||
self.update_disk_consistent_lsn(disk_consistent_lsn, layer_paths_to_upload)?;
|
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||||
|
|
||||||
|
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
||||||
|
// After crash, we will restart WAL streaming and processing from that point.
|
||||||
|
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||||
|
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
||||||
|
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)?;
|
||||||
|
// Also update the in-memory copy
|
||||||
|
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update metadata file
|
/// Update metadata file
|
||||||
fn update_disk_consistent_lsn(
|
fn update_metadata_file(
|
||||||
&self,
|
&self,
|
||||||
disk_consistent_lsn: Lsn,
|
disk_consistent_lsn: Lsn,
|
||||||
layer_paths_to_upload: HashMap<PathBuf, LayerFileMetadata>,
|
layer_paths_to_upload: HashMap<PathBuf, LayerFileMetadata>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
// We can only save a valid 'prev_record_lsn' value on disk if we
|
||||||
// After crash, we will restart WAL streaming and processing from that point.
|
// flushed *all* in-memory changes to disk. We only track
|
||||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
// 'prev_record_lsn' in memory for the latest processed record, so we
|
||||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
// don't remember what the correct value that corresponds to some old
|
||||||
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
// LSN is. But if we flush everything, then the value corresponding
|
||||||
|
// current 'last_record_lsn' is correct and we can store it on disk.
|
||||||
|
let RecordLsn {
|
||||||
|
last: last_record_lsn,
|
||||||
|
prev: prev_record_lsn,
|
||||||
|
} = self.last_record_lsn.load();
|
||||||
|
let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
|
||||||
|
Some(prev_record_lsn)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
// We can only save a valid 'prev_record_lsn' value on disk if we
|
let ancestor_timeline_id = self
|
||||||
// flushed *all* in-memory changes to disk. We only track
|
.ancestor_timeline
|
||||||
// 'prev_record_lsn' in memory for the latest processed record, so we
|
.as_ref()
|
||||||
// don't remember what the correct value that corresponds to some old
|
.map(|ancestor| ancestor.timeline_id);
|
||||||
// LSN is. But if we flush everything, then the value corresponding
|
|
||||||
// current 'last_record_lsn' is correct and we can store it on disk.
|
|
||||||
let RecordLsn {
|
|
||||||
last: last_record_lsn,
|
|
||||||
prev: prev_record_lsn,
|
|
||||||
} = self.last_record_lsn.load();
|
|
||||||
let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
|
|
||||||
Some(prev_record_lsn)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let ancestor_timeline_id = self
|
let metadata = TimelineMetadata::new(
|
||||||
.ancestor_timeline
|
disk_consistent_lsn,
|
||||||
.as_ref()
|
ondisk_prev_record_lsn,
|
||||||
.map(|ancestor| ancestor.timeline_id);
|
ancestor_timeline_id,
|
||||||
|
self.ancestor_lsn,
|
||||||
|
*self.latest_gc_cutoff_lsn.read(),
|
||||||
|
self.initdb_lsn,
|
||||||
|
self.pg_version,
|
||||||
|
);
|
||||||
|
|
||||||
let metadata = TimelineMetadata::new(
|
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
|
||||||
disk_consistent_lsn,
|
"{}",
|
||||||
ondisk_prev_record_lsn,
|
x.unwrap()
|
||||||
ancestor_timeline_id,
|
));
|
||||||
self.ancestor_lsn,
|
|
||||||
*self.latest_gc_cutoff_lsn.read(),
|
|
||||||
self.initdb_lsn,
|
|
||||||
self.pg_version,
|
|
||||||
);
|
|
||||||
|
|
||||||
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
|
save_metadata(
|
||||||
"{}",
|
self.conf,
|
||||||
x.unwrap()
|
self.timeline_id,
|
||||||
));
|
self.tenant_id,
|
||||||
|
&metadata,
|
||||||
|
false,
|
||||||
|
)?;
|
||||||
|
|
||||||
save_metadata(
|
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
||||||
self.conf,
|
storage_sync::schedule_layer_upload(
|
||||||
self.timeline_id,
|
|
||||||
self.tenant_id,
|
self.tenant_id,
|
||||||
&metadata,
|
self.timeline_id,
|
||||||
false,
|
layer_paths_to_upload,
|
||||||
)?;
|
Some(metadata),
|
||||||
|
);
|
||||||
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
|
||||||
storage_sync::schedule_layer_upload(
|
|
||||||
self.tenant_id,
|
|
||||||
self.timeline_id,
|
|
||||||
layer_paths_to_upload,
|
|
||||||
Some(metadata),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Also update the in-memory copy
|
|
||||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -1961,6 +1959,9 @@ impl Timeline {
|
|||||||
new_gc_cutoff
|
new_gc_cutoff
|
||||||
);
|
);
|
||||||
write_guard.store_and_unlock(new_gc_cutoff).wait();
|
write_guard.store_and_unlock(new_gc_cutoff).wait();
|
||||||
|
|
||||||
|
// Persist metadata file
|
||||||
|
self.update_metadata_file(self.disk_consistent_lsn.load(), HashSet::new())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("GC starting");
|
info!("GC starting");
|
||||||
@@ -2087,6 +2088,18 @@ impl Timeline {
|
|||||||
result.layers_removed += 1;
|
result.layers_removed += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"GC completed removing {} layers, cuttof {}",
|
||||||
|
result.layers_removed, new_gc_cutoff
|
||||||
|
);
|
||||||
|
if result.layers_removed != 0 {
|
||||||
|
fail_point!("gc-before-save-metadata", |_| {
|
||||||
|
info!("Abnormaly terinate pageserver at gc-before-save-metadata fail point");
|
||||||
|
std::process::abort();
|
||||||
|
});
|
||||||
|
return Ok(result);
|
||||||
|
}
|
||||||
|
|
||||||
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
||||||
storage_sync::schedule_layer_delete(
|
storage_sync::schedule_layer_delete(
|
||||||
self.tenant_id,
|
self.tenant_id,
|
||||||
|
|||||||
@@ -70,8 +70,10 @@ async fn compaction_loop(tenant_id: TenantId) {
|
|||||||
// Run compaction
|
// Run compaction
|
||||||
let mut sleep_duration = tenant.get_compaction_period();
|
let mut sleep_duration = tenant.get_compaction_period();
|
||||||
if let Err(e) = tenant.compaction_iteration() {
|
if let Err(e) = tenant.compaction_iteration() {
|
||||||
error!("Compaction failed, retrying: {e:#}");
|
|
||||||
sleep_duration = wait_duration;
|
sleep_duration = wait_duration;
|
||||||
|
error!("Compaction failed, retrying in {:?}: {e:#}", sleep_duration);
|
||||||
|
#[cfg(feature = "testing")]
|
||||||
|
std::process::abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sleep
|
// Sleep
|
||||||
@@ -119,8 +121,10 @@ async fn gc_loop(tenant_id: TenantId) {
|
|||||||
if gc_horizon > 0 {
|
if gc_horizon > 0 {
|
||||||
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false)
|
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false)
|
||||||
{
|
{
|
||||||
error!("Gc failed, retrying: {e:#}");
|
|
||||||
sleep_duration = wait_duration;
|
sleep_duration = wait_duration;
|
||||||
|
error!("Gc failed, retrying in {:?}: {e:#}", sleep_duration);
|
||||||
|
#[cfg(feature = "testing")]
|
||||||
|
std::process::abort();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
38
test_runner/regress/test_gc_cutoff.py
Normal file
38
test_runner/regress/test_gc_cutoff.py
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
import pytest
|
||||||
|
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||||
|
from performance.test_perf_pgbench import get_scales_matrix
|
||||||
|
|
||||||
|
|
||||||
|
# Test gc_cuttoff
|
||||||
|
#
|
||||||
|
# This test set fail point after at the end of GC and checks
|
||||||
|
# that pageserver normally restarts after it
|
||||||
|
@pytest.mark.parametrize("scale", get_scales_matrix(10))
|
||||||
|
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, scale: int):
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
pageserver_http = env.pageserver.http_client()
|
||||||
|
|
||||||
|
# Use aggressive GC and checkpoint settings, so that we also exercise GC during the test
|
||||||
|
tenant_id, _ = env.neon_cli.create_tenant(
|
||||||
|
conf={
|
||||||
|
"gc_period": "10 s",
|
||||||
|
"gc_horizon": f"{1024 ** 2}",
|
||||||
|
"checkpoint_distance": f"{1024 ** 2}",
|
||||||
|
"compaction_target_size": f"{1024 ** 2}",
|
||||||
|
# set PITR interval to be small, so we can do GC
|
||||||
|
"pitr_interval": "1 s",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
pg = env.postgres.create_start("main", tenant_id=tenant_id)
|
||||||
|
connstr = pg.connstr()
|
||||||
|
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||||
|
|
||||||
|
pageserver_http.configure_failpoints(("gc-before-save-metadata", "return"))
|
||||||
|
|
||||||
|
for i in range(5):
|
||||||
|
try:
|
||||||
|
pg_bin.run_capture(["pgbench", "-T100", connstr])
|
||||||
|
except Exception:
|
||||||
|
env.pageserver.stop()
|
||||||
|
env.pageserver.start()
|
||||||
|
pageserver_http.configure_failpoints(("gc-before-save-metadata", "return"))
|
||||||
Reference in New Issue
Block a user