From 221414de4b0260056e0961528d46c5141825a0a0 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 10 Apr 2024 06:31:28 +0100 Subject: [PATCH] pageserver: time based rolling based on the first write timestamp (#7346) Problem Currently, we base our time based layer rolling decision on the last time we froze a layer. This means that if we roll a layer and then go idle for longer than the checkpoint timeout the next layer will be rolled after the first write. This is of course not desirable. Summary of changes Record the timepoint of the first write to an open layer and use that for time based layer rolling decisions. Note that I had to keep `Timeline::last_freeze_ts` for the sharded tenant disk consistent lsn skip hack. Fixes #7241 --- .../tenant/storage_layer/inmemory_layer.rs | 8 +++ pageserver/src/tenant/timeline.rs | 29 ++++------- .../regress/test_pageserver_layer_rolling.py | 50 ++++++++++++++++--- 3 files changed, 62 insertions(+), 25 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 43942ba2db..29751641b4 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -19,6 +19,7 @@ use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; use std::collections::{BinaryHeap, HashMap, HashSet}; use std::sync::{Arc, OnceLock}; +use std::time::Instant; use tracing::*; use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap}; // avoid binding to Write (conflicts with std::io::Write) @@ -53,6 +54,8 @@ pub struct InMemoryLayer { /// Writes are only allowed when this is `None`. end_lsn: OnceLock, + opened_at: Instant, + /// The above fields never change, except for `end_lsn`, which is only set once. /// All other changing parts are in `inner`, and protected by a mutex. inner: RwLock, @@ -460,6 +463,7 @@ impl InMemoryLayer { tenant_shard_id, start_lsn, end_lsn: OnceLock::new(), + opened_at: Instant::now(), inner: RwLock::new(InMemoryLayerInner { index: HashMap::new(), file, @@ -520,6 +524,10 @@ impl InMemoryLayer { Ok(()) } + pub(crate) fn get_opened_at(&self) -> Instant { + self.opened_at + } + pub(crate) async fn tick(&self) -> Option { let mut inner = self.inner.write().await; let size = inner.file.len(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d3c8c5f66c..d046a60af4 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1257,7 +1257,7 @@ impl Timeline { checkpoint_distance, self.get_last_record_lsn(), self.last_freeze_at.load(), - *self.last_freeze_ts.read().unwrap(), + open_layer.get_opened_at(), ) { match open_layer.info() { InMemoryLayerInfo::Frozen { lsn_start, lsn_end } => { @@ -1622,7 +1622,7 @@ impl Timeline { checkpoint_distance: u64, projected_lsn: Lsn, last_freeze_at: Lsn, - last_freeze_ts: Instant, + opened_at: Instant, ) -> bool { let distance = projected_lsn.widening_sub(last_freeze_at); @@ -1648,13 +1648,13 @@ impl Timeline { ); true - } else if distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout() { + } else if distance > 0 && opened_at.elapsed() >= self.get_checkpoint_timeout() { info!( - "Will roll layer at {} with layer size {} due to time since last flush ({:?})", - projected_lsn, - layer_size, - last_freeze_ts.elapsed() - ); + "Will roll layer at {} with layer size {} due to time since first write to the layer ({:?})", + projected_lsn, + layer_size, + opened_at.elapsed() + ); true } else { @@ -4703,23 +4703,16 @@ struct TimelineWriterState { max_lsn: Option, // Cached details of the last freeze. Avoids going trough the atomic/lock on every put. cached_last_freeze_at: Lsn, - cached_last_freeze_ts: Instant, } impl TimelineWriterState { - fn new( - open_layer: Arc, - current_size: u64, - last_freeze_at: Lsn, - last_freeze_ts: Instant, - ) -> Self { + fn new(open_layer: Arc, current_size: u64, last_freeze_at: Lsn) -> Self { Self { open_layer, current_size, prev_lsn: None, max_lsn: None, cached_last_freeze_at: last_freeze_at, - cached_last_freeze_ts: last_freeze_ts, } } } @@ -4818,12 +4811,10 @@ impl<'a> TimelineWriter<'a> { let initial_size = layer.size().await?; let last_freeze_at = self.last_freeze_at.load(); - let last_freeze_ts = *self.last_freeze_ts.read().unwrap(); self.write_guard.replace(TimelineWriterState::new( layer, initial_size, last_freeze_at, - last_freeze_ts, )); Ok(()) @@ -4870,7 +4861,7 @@ impl<'a> TimelineWriter<'a> { self.get_checkpoint_distance(), lsn, state.cached_last_freeze_at, - state.cached_last_freeze_ts, + state.open_layer.get_opened_at(), ) { OpenLayerAction::Roll } else { diff --git a/test_runner/regress/test_pageserver_layer_rolling.py b/test_runner/regress/test_pageserver_layer_rolling.py index c7e1e88468..c5dc0f2919 100644 --- a/test_runner/regress/test_pageserver_layer_rolling.py +++ b/test_runner/regress/test_pageserver_layer_rolling.py @@ -1,6 +1,7 @@ import asyncio import os -from typing import Tuple +import time +from typing import Optional, Tuple import psutil import pytest @@ -20,20 +21,30 @@ ENTRIES_PER_TIMELINE = 10_000 CHECKPOINT_TIMEOUT_SECONDS = 60 -async def run_worker(env: NeonEnv, tenant_conf, entries: int) -> Tuple[TenantId, TimelineId, Lsn]: - tenant, timeline = env.neon_cli.create_tenant(conf=tenant_conf) +async def run_worker_for_tenant( + env: NeonEnv, entries: int, tenant: TenantId, offset: Optional[int] = None +) -> Lsn: + if offset is None: + offset = 0 + with env.endpoints.create_start("main", tenant_id=tenant) as ep: conn = await ep.connect_async() try: await conn.execute("CREATE TABLE IF NOT EXISTS t(key serial primary key, value text)") await conn.execute( - f"INSERT INTO t SELECT i, CONCAT('payload_', i) FROM generate_series(0,{entries}) as i" + f"INSERT INTO t SELECT i, CONCAT('payload_', i) FROM generate_series({offset},{entries}) as i" ) finally: await conn.close(timeout=10) last_flush_lsn = Lsn(ep.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) - return tenant, timeline, last_flush_lsn + return last_flush_lsn + + +async def run_worker(env: NeonEnv, tenant_conf, entries: int) -> Tuple[TenantId, TimelineId, Lsn]: + tenant, timeline = env.neon_cli.create_tenant(conf=tenant_conf) + last_flush_lsn = await run_worker_for_tenant(env, entries, tenant) + return tenant, timeline, last_flush_lsn async def workload( @@ -89,7 +100,9 @@ def assert_dirty_bytes(env, v): def assert_dirty_bytes_nonzero(env): - assert get_dirty_bytes(env) > 0 + dirty_bytes = get_dirty_bytes(env) + assert dirty_bytes > 0 + return dirty_bytes @pytest.mark.parametrize("immediate_shutdown", [True, False]) @@ -182,6 +195,31 @@ def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder): log.info("Waiting for background checkpoints...") wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) # type: ignore + # The code below verifies that we do not flush on the first write + # after an idle period longer than the checkpoint timeout. + + # Sit quietly for longer than the checkpoint timeout + time.sleep(CHECKPOINT_TIMEOUT_SECONDS + CHECKPOINT_TIMEOUT_SECONDS / 2) + + # Restart the safekeepers and write a bit of extra data into one tenant + for sk in env.safekeepers: + sk.start() + + tenant_with_extra_writes = last_flush_lsns[0][0] + asyncio.run( + run_worker_for_tenant(env, 5, tenant_with_extra_writes, offset=ENTRIES_PER_TIMELINE) + ) + + dirty_after_write = wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # type: ignore + + # We shouldn't flush since we've just opened a new layer + waited_for = 0 + while waited_for < CHECKPOINT_TIMEOUT_SECONDS // 4: + time.sleep(5) + waited_for += 5 + + assert get_dirty_bytes(env) >= dirty_after_write + @pytest.mark.skipif( # We have to use at least ~100MB of data to hit the lowest limit we can configure, which is