add basic test & fix embarrasing bug in cull (needs comment out todo!())

This commit is contained in:
Christian Schwarz
2025-07-25 14:15:04 +02:00
parent 1e61ac6af2
commit e1eb98c0e9
2 changed files with 179 additions and 1 deletions

View File

@@ -9516,6 +9516,168 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_standby_horizons() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_standby_horizons")
.await
.unwrap()
.load()
.await;
// When we call gc_timline below, we have no way to inject a deterministic SystemTime::now()
// to the place where it culls leases.
// So, pick a lease length longer than this test will run.
let lease_length = Duration::from_secs(24 * 60 * 60);
tenant
.update_tenant_config(|mut conf| {
conf.standby_horizon_lease_length = Some(lease_length);
Ok(conf)
})
.unwrap();
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_lsn = Lsn(0x100);
let image_layers = (0x10..=0x90)
.step_by(0x10)
.map(|n| (Lsn(n), vec![(key, test_img(&format!("data key at {n:x}")))]))
.collect();
let timeline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
Vec::new(), // in-memory layers
Vec::new(),
image_layers,
end_lsn,
)
.await?;
// Test lease acquisition from multiple holders, identified by their index in the array.
let initial_leases = [0x30, 0x50, 0x70];
let mut expirations = Vec::new();
initial_leases.iter().enumerate().for_each(|(i, n)| {
let lease_id = format!("test_lease_{}", i);
expirations.push(
timeline
.lease_standby_horizon(lease_id, Lsn(*n), &ctx)
.expect("standby horizon lease request should succeed"),
);
});
// Test leasing same standby_horizon by different ID yields a fresh lease.
// NB: leases are tracked by SystemTime, which is not monotonic, but we want to assert monotonicity of the lease below.
// Sleep a second to make flakiness less likely.
tokio::time::sleep(Duration::from_secs(1)).await;
let renewed_lease = timeline
.lease_standby_horizon("renewed_lease_0".to_string(), Lsn(initial_leases[0]), &ctx)
.expect("standby horizon lease renewal should succeed");
assert!(
renewed_lease >= expirations[0],
"New lease should have expiration time at least as good as original"
);
// whitebox test to assert we're now tracking 4 leases total
assert_eq!(timeline.standby_horizons.get_leases().len(), 4);
assert_eq!(timeline.standby_horizons.legacy(), None);
// Also throw in some legacy propagation value at lowest LSN, so we know
// legacy propagation is respected.
let legacy_lsn = Lsn(0x20);
timeline.standby_horizons.register_legacy_update(legacy_lsn);
assert_eq!(timeline.standby_horizons.legacy(), Some(legacy_lsn));
// Force set disk consistent lsn so we can get the cutoff at `end_lsn`.
timeline.force_set_disk_consistent_lsn(end_lsn);
let leases = timeline.standby_horizons.get_leases();
let legacy = timeline.standby_horizons.legacy().unwrap();
//
// Do GC with 0 space/time cutoff (horizon=0).
//
let res = tenant
.gc_iteration(
Some(TIMELINE_ID),
0,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await
.unwrap();
debug!("res: {:?}", res);
//
// Verify standby horizons did hold up GC
//
// 0x20 was the lowest horizon
assert_eq!(*timeline.get_applied_gc_cutoff_lsn(), Lsn(0x20));
// Legacy propagation mechanism gets cleared by gc
assert_eq!(timeline.standby_horizons.legacy(), None);
// Leases do not hold up GC
assert_eq!(timeline.standby_horizons.get_leases().len(), 4);
assert_eq!(
res.layers_needed_by_lsn_leases, 0,
"we're not using LSN leases in this test"
);
// Should be able to read at the tip
timeline
.get(key, end_lsn, &ctx)
.await
.expect("should be able to read data");
// Should be able to read at any LSN between any standby_horizon and tip
let readable_lsns = (legacy.0..=end_lsn.0)
.chain(leases.iter().map(|(lsn, _)| (lsn.0..=end_lsn.0)).flatten())
.dedup()
.map(|lsn| Lsn(lsn))
.collect_vec();
for lsn in readable_lsns {
timeline
.get(key, lsn, &ctx)
.await
.expect("should be able to read data");
}
//
// Verify culling works
// (Again, incorrectly rely on monotonicity of SystemTime here)
//
timeline
.standby_horizons
.cull_leases(SystemTime::now() + lease_length + Duration::from_secs(1));
assert_eq!(timeline.standby_horizons.get_leases().len(), 0);
//
// Run GC again
//
let res = tenant
.gc_iteration(
Some(TIMELINE_ID),
0,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await
.unwrap();
debug!("res: {:?}", res);
// This time, gc should not have been held up.
assert_eq!(*timeline.get_applied_gc_cutoff_lsn(), end_lsn);
Ok(())
}
#[tokio::test]
async fn test_failed_flush_should_not_update_disk_consistent_lsn() -> anyhow::Result<()> {
//

View File

@@ -144,7 +144,7 @@ impl Horizons {
pub fn cull_leases(&self, now: SystemTime) {
let mut inner = self.inner.lock().unwrap();
inner.leases_by_id.retain(|_, l| l.valid_until <= now);
inner.leases_by_id.retain(|_, l| l.valid_until > now);
}
pub fn dump(&self) -> serde_json::Value {
@@ -159,4 +159,20 @@ impl Horizons {
"leases_by_id": format!("{leases_by_id:?}"),
})
}
#[cfg(test )]
pub fn legacy(&self) -> Option<Lsn> {
let inner = self.inner.lock().unwrap();
inner.legacy
}
#[cfg(test)]
pub fn get_leases(&self) -> Vec<(Lsn, SystemTime)> {
let inner = self.inner.lock().unwrap();
inner
.leases_by_id
.iter()
.map(|(_, lease)| (lease.lsn, lease.valid_until))
.collect()
}
}