mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 21:40:39 +00:00
pageserver: time based rolling based on the first write timestamp (#7346)
Problem Currently, we base our time based layer rolling decision on the last time we froze a layer. This means that if we roll a layer and then go idle for longer than the checkpoint timeout the next layer will be rolled after the first write. This is of course not desirable. Summary of changes Record the timepoint of the first write to an open layer and use that for time based layer rolling decisions. Note that I had to keep `Timeline::last_freeze_ts` for the sharded tenant disk consistent lsn skip hack. Fixes #7241
This commit is contained in:
@@ -19,6 +19,7 @@ use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::Instant;
|
||||
use tracing::*;
|
||||
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
|
||||
// avoid binding to Write (conflicts with std::io::Write)
|
||||
@@ -53,6 +54,8 @@ pub struct InMemoryLayer {
|
||||
/// Writes are only allowed when this is `None`.
|
||||
end_lsn: OnceLock<Lsn>,
|
||||
|
||||
opened_at: Instant,
|
||||
|
||||
/// The above fields never change, except for `end_lsn`, which is only set once.
|
||||
/// All other changing parts are in `inner`, and protected by a mutex.
|
||||
inner: RwLock<InMemoryLayerInner>,
|
||||
@@ -460,6 +463,7 @@ impl InMemoryLayer {
|
||||
tenant_shard_id,
|
||||
start_lsn,
|
||||
end_lsn: OnceLock::new(),
|
||||
opened_at: Instant::now(),
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
index: HashMap::new(),
|
||||
file,
|
||||
@@ -520,6 +524,10 @@ impl InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_opened_at(&self) -> Instant {
|
||||
self.opened_at
|
||||
}
|
||||
|
||||
pub(crate) async fn tick(&self) -> Option<u64> {
|
||||
let mut inner = self.inner.write().await;
|
||||
let size = inner.file.len();
|
||||
|
||||
@@ -1257,7 +1257,7 @@ impl Timeline {
|
||||
checkpoint_distance,
|
||||
self.get_last_record_lsn(),
|
||||
self.last_freeze_at.load(),
|
||||
*self.last_freeze_ts.read().unwrap(),
|
||||
open_layer.get_opened_at(),
|
||||
) {
|
||||
match open_layer.info() {
|
||||
InMemoryLayerInfo::Frozen { lsn_start, lsn_end } => {
|
||||
@@ -1622,7 +1622,7 @@ impl Timeline {
|
||||
checkpoint_distance: u64,
|
||||
projected_lsn: Lsn,
|
||||
last_freeze_at: Lsn,
|
||||
last_freeze_ts: Instant,
|
||||
opened_at: Instant,
|
||||
) -> bool {
|
||||
let distance = projected_lsn.widening_sub(last_freeze_at);
|
||||
|
||||
@@ -1648,13 +1648,13 @@ impl Timeline {
|
||||
);
|
||||
|
||||
true
|
||||
} else if distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout() {
|
||||
} else if distance > 0 && opened_at.elapsed() >= self.get_checkpoint_timeout() {
|
||||
info!(
|
||||
"Will roll layer at {} with layer size {} due to time since last flush ({:?})",
|
||||
projected_lsn,
|
||||
layer_size,
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
"Will roll layer at {} with layer size {} due to time since first write to the layer ({:?})",
|
||||
projected_lsn,
|
||||
layer_size,
|
||||
opened_at.elapsed()
|
||||
);
|
||||
|
||||
true
|
||||
} else {
|
||||
@@ -4703,23 +4703,16 @@ struct TimelineWriterState {
|
||||
max_lsn: Option<Lsn>,
|
||||
// Cached details of the last freeze. Avoids going trough the atomic/lock on every put.
|
||||
cached_last_freeze_at: Lsn,
|
||||
cached_last_freeze_ts: Instant,
|
||||
}
|
||||
|
||||
impl TimelineWriterState {
|
||||
fn new(
|
||||
open_layer: Arc<InMemoryLayer>,
|
||||
current_size: u64,
|
||||
last_freeze_at: Lsn,
|
||||
last_freeze_ts: Instant,
|
||||
) -> Self {
|
||||
fn new(open_layer: Arc<InMemoryLayer>, current_size: u64, last_freeze_at: Lsn) -> Self {
|
||||
Self {
|
||||
open_layer,
|
||||
current_size,
|
||||
prev_lsn: None,
|
||||
max_lsn: None,
|
||||
cached_last_freeze_at: last_freeze_at,
|
||||
cached_last_freeze_ts: last_freeze_ts,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4818,12 +4811,10 @@ impl<'a> TimelineWriter<'a> {
|
||||
let initial_size = layer.size().await?;
|
||||
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *self.last_freeze_ts.read().unwrap();
|
||||
self.write_guard.replace(TimelineWriterState::new(
|
||||
layer,
|
||||
initial_size,
|
||||
last_freeze_at,
|
||||
last_freeze_ts,
|
||||
));
|
||||
|
||||
Ok(())
|
||||
@@ -4870,7 +4861,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
self.get_checkpoint_distance(),
|
||||
lsn,
|
||||
state.cached_last_freeze_at,
|
||||
state.cached_last_freeze_ts,
|
||||
state.open_layer.get_opened_at(),
|
||||
) {
|
||||
OpenLayerAction::Roll
|
||||
} else {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Tuple
|
||||
import time
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import psutil
|
||||
import pytest
|
||||
@@ -20,20 +21,30 @@ ENTRIES_PER_TIMELINE = 10_000
|
||||
CHECKPOINT_TIMEOUT_SECONDS = 60
|
||||
|
||||
|
||||
async def run_worker(env: NeonEnv, tenant_conf, entries: int) -> Tuple[TenantId, TimelineId, Lsn]:
|
||||
tenant, timeline = env.neon_cli.create_tenant(conf=tenant_conf)
|
||||
async def run_worker_for_tenant(
|
||||
env: NeonEnv, entries: int, tenant: TenantId, offset: Optional[int] = None
|
||||
) -> Lsn:
|
||||
if offset is None:
|
||||
offset = 0
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant) as ep:
|
||||
conn = await ep.connect_async()
|
||||
try:
|
||||
await conn.execute("CREATE TABLE IF NOT EXISTS t(key serial primary key, value text)")
|
||||
await conn.execute(
|
||||
f"INSERT INTO t SELECT i, CONCAT('payload_', i) FROM generate_series(0,{entries}) as i"
|
||||
f"INSERT INTO t SELECT i, CONCAT('payload_', i) FROM generate_series({offset},{entries}) as i"
|
||||
)
|
||||
finally:
|
||||
await conn.close(timeout=10)
|
||||
|
||||
last_flush_lsn = Lsn(ep.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
return tenant, timeline, last_flush_lsn
|
||||
return last_flush_lsn
|
||||
|
||||
|
||||
async def run_worker(env: NeonEnv, tenant_conf, entries: int) -> Tuple[TenantId, TimelineId, Lsn]:
|
||||
tenant, timeline = env.neon_cli.create_tenant(conf=tenant_conf)
|
||||
last_flush_lsn = await run_worker_for_tenant(env, entries, tenant)
|
||||
return tenant, timeline, last_flush_lsn
|
||||
|
||||
|
||||
async def workload(
|
||||
@@ -89,7 +100,9 @@ def assert_dirty_bytes(env, v):
|
||||
|
||||
|
||||
def assert_dirty_bytes_nonzero(env):
|
||||
assert get_dirty_bytes(env) > 0
|
||||
dirty_bytes = get_dirty_bytes(env)
|
||||
assert dirty_bytes > 0
|
||||
return dirty_bytes
|
||||
|
||||
|
||||
@pytest.mark.parametrize("immediate_shutdown", [True, False])
|
||||
@@ -182,6 +195,31 @@ def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder):
|
||||
log.info("Waiting for background checkpoints...")
|
||||
wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) # type: ignore
|
||||
|
||||
# The code below verifies that we do not flush on the first write
|
||||
# after an idle period longer than the checkpoint timeout.
|
||||
|
||||
# Sit quietly for longer than the checkpoint timeout
|
||||
time.sleep(CHECKPOINT_TIMEOUT_SECONDS + CHECKPOINT_TIMEOUT_SECONDS / 2)
|
||||
|
||||
# Restart the safekeepers and write a bit of extra data into one tenant
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
|
||||
tenant_with_extra_writes = last_flush_lsns[0][0]
|
||||
asyncio.run(
|
||||
run_worker_for_tenant(env, 5, tenant_with_extra_writes, offset=ENTRIES_PER_TIMELINE)
|
||||
)
|
||||
|
||||
dirty_after_write = wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # type: ignore
|
||||
|
||||
# We shouldn't flush since we've just opened a new layer
|
||||
waited_for = 0
|
||||
while waited_for < CHECKPOINT_TIMEOUT_SECONDS // 4:
|
||||
time.sleep(5)
|
||||
waited_for += 5
|
||||
|
||||
assert get_dirty_bytes(env) >= dirty_after_write
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
# We have to use at least ~100MB of data to hit the lowest limit we can configure, which is
|
||||
|
||||
Reference in New Issue
Block a user