mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
pageserver: revert open layer rolling revert (#6962)
## Problem
We reverted https://github.com/neondatabase/neon/pull/6661 a few days
ago. The change led to OOMs in
benchmarks followed by large WAL reingests.
The issue was that we removed [this
code](d04af08567/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs (L409-L417)).
That call may trigger a roll of the open layer due to
the keepalive messages received from the safekeeper. Removing it meant
that enforcing
of checkpoint timeout became even more lax and led to using up large
amounts of memory
for the in memory layer indices.
## Summary of changes
Piggyback on keep alive messages to enforce checkpoint timeout. This is
a hack, but it's exactly what
the current code is doing.
## Alternatives
Christhian, Joonas and myself sketched out a timer based approach
[here](https://github.com/neondatabase/neon/pull/6940). While discussing
it further, it became obvious that's also a bit of a hack and not the
desired end state. I chose not
to take that further since it's not what we ultimately want and it'll be
harder to rip out.
Right now it's unclear what the ideal system behaviour is:
* early flushing on memory pressure, or ...
* detaching tenants on memory pressure
This commit is contained in:
@@ -15,6 +15,7 @@ use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
|
||||
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
|
||||
@@ -1498,7 +1499,7 @@ impl<'a> DatadirModification<'a> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let writer = self.tline.writer().await;
|
||||
let mut writer = self.tline.writer().await;
|
||||
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
|
||||
@@ -1537,14 +1538,22 @@ impl<'a> DatadirModification<'a> {
|
||||
/// All the modifications in this atomic update are stamped by the specified LSN.
|
||||
///
|
||||
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer().await;
|
||||
let mut writer = self.tline.writer().await;
|
||||
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
if !self.pending_updates.is_empty() {
|
||||
writer.put_batch(&self.pending_updates, ctx).await?;
|
||||
self.pending_updates.clear();
|
||||
// The put_batch call below expects expects the inputs to be sorted by Lsn,
|
||||
// so we do that first.
|
||||
let lsn_ordered_batch: Vec<(Key, Lsn, Value)> = self
|
||||
.pending_updates
|
||||
.drain()
|
||||
.map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (key, lsn, val)))
|
||||
.kmerge_by(|lhs, rhs| lhs.1 .0 < rhs.1 .0)
|
||||
.collect();
|
||||
|
||||
writer.put_batch(lsn_ordered_batch, ctx).await?;
|
||||
}
|
||||
|
||||
if !self.pending_deletions.is_empty() {
|
||||
|
||||
@@ -3857,7 +3857,7 @@ mod tests {
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
@@ -3869,7 +3869,7 @@ mod tests {
|
||||
writer.finish_write(Lsn(0x10));
|
||||
drop(writer);
|
||||
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
@@ -3935,7 +3935,7 @@ mod tests {
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
let TEST_KEY_A: Key = Key::from_hex("110000000033333333444444445500000001").unwrap();
|
||||
@@ -3969,7 +3969,7 @@ mod tests {
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.expect("Should have a local timeline");
|
||||
let new_writer = newtline.writer().await;
|
||||
let mut new_writer = newtline.writer().await;
|
||||
new_writer
|
||||
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
|
||||
.await?;
|
||||
@@ -4001,7 +4001,7 @@ mod tests {
|
||||
) -> anyhow::Result<()> {
|
||||
let mut lsn = start_lsn;
|
||||
{
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
// Create a relation on the timeline
|
||||
writer
|
||||
.put(
|
||||
@@ -4026,7 +4026,7 @@ mod tests {
|
||||
}
|
||||
tline.freeze_and_flush().await?;
|
||||
{
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
@@ -4389,7 +4389,7 @@ mod tests {
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
@@ -4406,7 +4406,7 @@ mod tests {
|
||||
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
@@ -4423,7 +4423,7 @@ mod tests {
|
||||
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
@@ -4440,7 +4440,7 @@ mod tests {
|
||||
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
@@ -4497,7 +4497,7 @@ mod tests {
|
||||
for _ in 0..repeat {
|
||||
for _ in 0..key_count {
|
||||
test_key.field6 = blknum;
|
||||
let writer = timeline.writer().await;
|
||||
let mut writer = timeline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
@@ -4690,7 +4690,7 @@ mod tests {
|
||||
|
||||
current_lsn += 0x100;
|
||||
|
||||
let writer = current_timeline.writer().await;
|
||||
let mut writer = current_timeline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
gap_at_key,
|
||||
@@ -4729,7 +4729,7 @@ mod tests {
|
||||
|
||||
current_lsn += 0x10;
|
||||
|
||||
let writer = child_timeline.writer().await;
|
||||
let mut writer = child_timeline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
current_key,
|
||||
@@ -4807,7 +4807,7 @@ mod tests {
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
@@ -4828,7 +4828,7 @@ mod tests {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
@@ -4896,7 +4896,7 @@ mod tests {
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
@@ -4925,7 +4925,7 @@ mod tests {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
@@ -5002,7 +5002,7 @@ mod tests {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer().await;
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
|
||||
@@ -336,32 +336,17 @@ impl InMemoryLayer {
|
||||
|
||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||
/// Adds the page version to the in-memory tree
|
||||
|
||||
pub(crate) async fn put_value(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: &Value,
|
||||
buf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
self.put_value_locked(&mut inner, key, lsn, val, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn put_values(
|
||||
&self,
|
||||
values: &HashMap<Key, Vec<(Lsn, Value)>>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
for (key, vals) in values {
|
||||
for (lsn, val) in vals {
|
||||
self.put_value_locked(&mut inner, *key, *lsn, val, ctx)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
|
||||
}
|
||||
|
||||
async fn put_value_locked(
|
||||
@@ -369,22 +354,16 @@ impl InMemoryLayer {
|
||||
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: &Value,
|
||||
buf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
|
||||
|
||||
let off = {
|
||||
// Avoid doing allocations for "small" values.
|
||||
// In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
|
||||
// https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
|
||||
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
|
||||
buf.clear();
|
||||
val.ser_into(&mut buf)?;
|
||||
locked_inner
|
||||
.file
|
||||
.write_blob(
|
||||
&buf,
|
||||
buf,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build(),
|
||||
@@ -412,7 +391,12 @@ impl InMemoryLayer {
|
||||
pub async fn freeze(&self, end_lsn: Lsn) {
|
||||
let inner = self.inner.write().await;
|
||||
|
||||
assert!(self.start_lsn < end_lsn);
|
||||
assert!(
|
||||
self.start_lsn < end_lsn,
|
||||
"{} >= {}",
|
||||
self.start_lsn,
|
||||
end_lsn
|
||||
);
|
||||
self.end_lsn.set(end_lsn).expect("end_lsn set only once");
|
||||
|
||||
for vec_map in inner.index.values() {
|
||||
|
||||
@@ -27,6 +27,18 @@ use pageserver_api::{
|
||||
};
|
||||
use rand::Rng;
|
||||
use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::{
|
||||
runtime::Handle,
|
||||
sync::{oneshot, watch},
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
sync::gate::{Gate, GateGuard},
|
||||
};
|
||||
|
||||
use std::ops::{Deref, Range};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
@@ -41,14 +53,6 @@ use std::{
|
||||
cmp::{max, min, Ordering},
|
||||
ops::ControlFlow,
|
||||
};
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::{
|
||||
runtime::Handle,
|
||||
sync::{oneshot, watch},
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
@@ -271,7 +275,7 @@ pub struct Timeline {
|
||||
/// Locked automatically by [`TimelineWriter`] and checkpointer.
|
||||
/// Must always be acquired before the layer map/individual layer lock
|
||||
/// to avoid deadlock.
|
||||
write_lock: tokio::sync::Mutex<()>,
|
||||
write_lock: tokio::sync::Mutex<Option<TimelineWriterState>>,
|
||||
|
||||
/// Used to avoid multiple `flush_loop` tasks running
|
||||
pub(super) flush_loop_state: Mutex<FlushLoopState>,
|
||||
@@ -917,8 +921,6 @@ impl Timeline {
|
||||
seq: &Bytes,
|
||||
vec: &Bytes,
|
||||
) {
|
||||
use utils::bin_ser::BeSer;
|
||||
|
||||
if *key == AUX_FILES_KEY {
|
||||
// The value reconstruct of AUX_FILES_KEY from records is not deterministic
|
||||
// since it uses a hash map under the hood. Hence, deserialise both results
|
||||
@@ -1149,58 +1151,10 @@ impl Timeline {
|
||||
pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
|
||||
TimelineWriter {
|
||||
tl: self,
|
||||
_write_guard: self.write_lock.lock().await,
|
||||
write_guard: self.write_lock.lock().await,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||
/// the in-memory layer, and initiate flushing it if so.
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let open_layer_size = {
|
||||
let guard = self.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
let Some(open_layer) = layers.open_layer.as_ref() else {
|
||||
return Ok(());
|
||||
};
|
||||
open_layer.size().await?
|
||||
};
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
|
||||
let distance = last_lsn.widening_sub(last_freeze_at);
|
||||
// Rolling the open layer can be triggered by:
|
||||
// 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
|
||||
// the safekeepers need to store. For sharded tenants, we multiply by shard count to
|
||||
// account for how writes are distributed across shards: we expect each node to consume
|
||||
// 1/count of the LSN on average.
|
||||
// 2. The size of the currently open layer.
|
||||
// 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
|
||||
// up and suspend activity.
|
||||
if (distance
|
||||
>= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128)
|
||||
|| open_layer_size > self.get_checkpoint_distance()
|
||||
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
|
||||
{
|
||||
info!(
|
||||
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
|
||||
distance,
|
||||
open_layer_size,
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true).await;
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
// Wake up the layer flusher
|
||||
self.flush_frozen_layers();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn activate(
|
||||
self: &Arc<Self>,
|
||||
broker_client: BrokerClientChannel,
|
||||
@@ -1635,7 +1589,7 @@ impl Timeline {
|
||||
layer_flush_start_tx,
|
||||
layer_flush_done_tx,
|
||||
|
||||
write_lock: tokio::sync::Mutex::new(()),
|
||||
write_lock: tokio::sync::Mutex::new(None),
|
||||
|
||||
gc_info: std::sync::RwLock::new(GcInfo {
|
||||
retain_lsns: Vec::new(),
|
||||
@@ -2961,43 +2915,6 @@ impl Timeline {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
async fn put_value(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: &Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write(lsn).await?;
|
||||
layer.put_value(key, lsn, val, ctx).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_values(
|
||||
&self,
|
||||
values: &HashMap<Key, Vec<(Lsn, Value)>>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Pick the first LSN in the batch to get the layer to write to.
|
||||
for lsns in values.values() {
|
||||
if let Some((lsn, _)) = lsns.first() {
|
||||
let layer = self.get_layer_for_write(*lsn).await?;
|
||||
layer.put_values(values, ctx).await?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
|
||||
if let Some((_, lsn)) = tombstones.first() {
|
||||
let layer = self.get_layer_for_write(*lsn).await?;
|
||||
layer.put_tombstones(tombstones).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
|
||||
assert!(new_lsn.is_aligned());
|
||||
|
||||
@@ -3008,14 +2925,20 @@ impl Timeline {
|
||||
async fn freeze_inmem_layer(&self, write_lock_held: bool) {
|
||||
// Freeze the current open in-memory layer. It will be written to disk on next
|
||||
// iteration.
|
||||
|
||||
let _write_guard = if write_lock_held {
|
||||
None
|
||||
} else {
|
||||
Some(self.write_lock.lock().await)
|
||||
};
|
||||
|
||||
self.freeze_inmem_layer_at(self.get_last_record_lsn()).await;
|
||||
}
|
||||
|
||||
async fn freeze_inmem_layer_at(&self, at: Lsn) {
|
||||
let mut guard = self.layers.write().await;
|
||||
guard
|
||||
.try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
|
||||
.try_freeze_in_memory_layer(at, &self.last_freeze_at)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -4392,13 +4315,43 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
|
||||
PageReconstructError::from(msg)
|
||||
}
|
||||
|
||||
struct TimelineWriterState {
|
||||
open_layer: Arc<InMemoryLayer>,
|
||||
current_size: u64,
|
||||
// Previous Lsn which passed through
|
||||
prev_lsn: Option<Lsn>,
|
||||
// Largest Lsn which passed through the current writer
|
||||
max_lsn: Option<Lsn>,
|
||||
// Cached details of the last freeze. Avoids going trough the atomic/lock on every put.
|
||||
cached_last_freeze_at: Lsn,
|
||||
cached_last_freeze_ts: Instant,
|
||||
}
|
||||
|
||||
impl TimelineWriterState {
|
||||
fn new(
|
||||
open_layer: Arc<InMemoryLayer>,
|
||||
current_size: u64,
|
||||
last_freeze_at: Lsn,
|
||||
last_freeze_ts: Instant,
|
||||
) -> Self {
|
||||
Self {
|
||||
open_layer,
|
||||
current_size,
|
||||
prev_lsn: None,
|
||||
max_lsn: None,
|
||||
cached_last_freeze_at: last_freeze_at,
|
||||
cached_last_freeze_ts: last_freeze_ts,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Various functions to mutate the timeline.
|
||||
// TODO Currently, Deref is used to allow easy access to read methods from this trait.
|
||||
// This is probably considered a bad practice in Rust and should be fixed eventually,
|
||||
// but will cause large code changes.
|
||||
pub(crate) struct TimelineWriter<'a> {
|
||||
tl: &'a Timeline,
|
||||
_write_guard: tokio::sync::MutexGuard<'a, ()>,
|
||||
write_guard: tokio::sync::MutexGuard<'a, Option<TimelineWriterState>>,
|
||||
}
|
||||
|
||||
impl Deref for TimelineWriter<'_> {
|
||||
@@ -4409,31 +4362,239 @@ impl Deref for TimelineWriter<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimelineWriter<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.write_guard.take();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq)]
|
||||
enum OpenLayerAction {
|
||||
Roll,
|
||||
Open,
|
||||
None,
|
||||
}
|
||||
|
||||
impl<'a> TimelineWriter<'a> {
|
||||
/// Put a new page version that can be constructed from a WAL record
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub(crate) async fn put(
|
||||
&self,
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
value: &Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
self.tl.put_value(key, lsn, value, ctx).await
|
||||
// Avoid doing allocations for "small" values.
|
||||
// In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
|
||||
// https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
|
||||
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
|
||||
value.ser_into(&mut buf)?;
|
||||
let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
|
||||
|
||||
let action = self.get_open_layer_action(lsn, buf_size);
|
||||
let layer = self.handle_open_layer_action(lsn, action).await?;
|
||||
let res = layer.put_value(key, lsn, &buf, ctx).await;
|
||||
|
||||
if res.is_ok() {
|
||||
// Update the current size only when the entire write was ok.
|
||||
// In case of failures, we may have had partial writes which
|
||||
// render the size tracking out of sync. That's ok because
|
||||
// the checkpoint distance should be significantly smaller
|
||||
// than the S3 single shot upload limit of 5GiB.
|
||||
let state = self.write_guard.as_mut().unwrap();
|
||||
|
||||
state.current_size += buf_size;
|
||||
state.prev_lsn = Some(lsn);
|
||||
state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
/// "Tick" the timeline writer: it will roll the open layer if required
|
||||
/// and do nothing else.
|
||||
pub(crate) async fn tick(&mut self) -> anyhow::Result<()> {
|
||||
self.open_layer_if_present().await?;
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let action = self.get_open_layer_action(last_record_lsn, 0);
|
||||
if action == OpenLayerAction::Roll {
|
||||
self.roll_layer(last_record_lsn).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Populate the timeline writer state only if an in-memory layer
|
||||
/// is already open.
|
||||
async fn open_layer_if_present(&mut self) -> anyhow::Result<()> {
|
||||
assert!(self.write_guard.is_none());
|
||||
|
||||
let open_layer = {
|
||||
let guard = self.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
match layers.open_layer {
|
||||
Some(ref open_layer) => open_layer.clone(),
|
||||
None => {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let initial_size = open_layer.size().await?;
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *self.last_freeze_ts.read().unwrap();
|
||||
self.write_guard.replace(TimelineWriterState::new(
|
||||
open_layer,
|
||||
initial_size,
|
||||
last_freeze_at,
|
||||
last_freeze_ts,
|
||||
));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_open_layer_action(
|
||||
&mut self,
|
||||
at: Lsn,
|
||||
action: OpenLayerAction,
|
||||
) -> anyhow::Result<&Arc<InMemoryLayer>> {
|
||||
match action {
|
||||
OpenLayerAction::Roll => {
|
||||
let freeze_at = self.write_guard.as_ref().unwrap().max_lsn.unwrap();
|
||||
self.roll_layer(freeze_at).await?;
|
||||
self.open_layer(at).await?;
|
||||
}
|
||||
OpenLayerAction::Open => self.open_layer(at).await?,
|
||||
OpenLayerAction::None => {
|
||||
assert!(self.write_guard.is_some());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(&self.write_guard.as_ref().unwrap().open_layer)
|
||||
}
|
||||
|
||||
async fn open_layer(&mut self, at: Lsn) -> anyhow::Result<()> {
|
||||
let layer = self.tl.get_layer_for_write(at).await?;
|
||||
let initial_size = layer.size().await?;
|
||||
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *self.last_freeze_ts.read().unwrap();
|
||||
self.write_guard.replace(TimelineWriterState::new(
|
||||
layer,
|
||||
initial_size,
|
||||
last_freeze_at,
|
||||
last_freeze_ts,
|
||||
));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn roll_layer(&mut self, freeze_at: Lsn) -> anyhow::Result<()> {
|
||||
assert!(self.write_guard.is_some());
|
||||
|
||||
self.tl.freeze_inmem_layer_at(freeze_at).await;
|
||||
|
||||
let now = Instant::now();
|
||||
*(self.last_freeze_ts.write().unwrap()) = now;
|
||||
|
||||
self.tl.flush_frozen_layers();
|
||||
|
||||
let current_size = self.write_guard.as_ref().unwrap().current_size;
|
||||
if current_size > self.get_checkpoint_distance() {
|
||||
warn!("Flushed oversized open layer with size {}", current_size)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_open_layer_action(&self, lsn: Lsn, new_value_size: u64) -> OpenLayerAction {
|
||||
let state = &*self.write_guard;
|
||||
let Some(state) = &state else {
|
||||
return OpenLayerAction::Open;
|
||||
};
|
||||
|
||||
if state.prev_lsn == Some(lsn) {
|
||||
// Rolling mid LSN is not supported by downstream code.
|
||||
// Hence, only roll at LSN boundaries.
|
||||
return OpenLayerAction::None;
|
||||
}
|
||||
|
||||
if state.current_size == 0 {
|
||||
// Don't roll empty layers
|
||||
return OpenLayerAction::None;
|
||||
}
|
||||
|
||||
let distance = lsn.widening_sub(state.cached_last_freeze_at);
|
||||
let proposed_open_layer_size = state.current_size + new_value_size;
|
||||
|
||||
// Rolling the open layer can be triggered by:
|
||||
// 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
|
||||
// the safekeepers need to store. For sharded tenants, we multiply by shard count to
|
||||
// account for how writes are distributed across shards: we expect each node to consume
|
||||
// 1/count of the LSN on average.
|
||||
// 2. The size of the currently open layer.
|
||||
// 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
|
||||
// up and suspend activity.
|
||||
if distance
|
||||
>= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128
|
||||
{
|
||||
info!(
|
||||
"Will roll layer at {} with layer size {} due to LSN distance ({})",
|
||||
lsn, state.current_size, distance
|
||||
);
|
||||
|
||||
OpenLayerAction::Roll
|
||||
} else if proposed_open_layer_size >= self.get_checkpoint_distance() {
|
||||
info!(
|
||||
"Will roll layer at {} with layer size {} due to layer size ({})",
|
||||
lsn, state.current_size, proposed_open_layer_size
|
||||
);
|
||||
|
||||
OpenLayerAction::Roll
|
||||
} else if distance > 0
|
||||
&& state.cached_last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()
|
||||
{
|
||||
info!(
|
||||
"Will roll layer at {} with layer size {} due to time since last flush ({:?})",
|
||||
lsn,
|
||||
state.current_size,
|
||||
state.cached_last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
OpenLayerAction::Roll
|
||||
} else {
|
||||
OpenLayerAction::None
|
||||
}
|
||||
}
|
||||
|
||||
/// Put a batch keys at the specified Lsns.
|
||||
///
|
||||
/// The batch should be sorted by Lsn such that it's safe
|
||||
/// to roll the open layer mid batch.
|
||||
pub(crate) async fn put_batch(
|
||||
&self,
|
||||
batch: &HashMap<Key, Vec<(Lsn, Value)>>,
|
||||
&mut self,
|
||||
batch: Vec<(Key, Lsn, Value)>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
self.tl.put_values(batch, ctx).await
|
||||
for (key, lsn, val) in batch {
|
||||
self.put(key, lsn, &val, ctx).await?
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
|
||||
self.tl.put_tombstones(batch).await
|
||||
pub(crate) async fn delete_batch(&mut self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
|
||||
if let Some((_, lsn)) = batch.first() {
|
||||
let action = self.get_open_layer_action(*lsn, 0);
|
||||
let layer = self.handle_open_layer_action(*lsn, action).await?;
|
||||
layer.put_tombstones(batch).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Track the end of the latest digested WAL record.
|
||||
|
||||
@@ -343,23 +343,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
filtered_records = 0;
|
||||
|
||||
//
|
||||
// We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise
|
||||
// layer size can become much larger than `checkpoint_distance`.
|
||||
// It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large
|
||||
// amount of data to key-value storage. So performing this check only after processing
|
||||
// all WAL records in the chunk, can cause huge L0 layer files.
|
||||
//
|
||||
timeline
|
||||
.check_checkpoint_distance()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -406,15 +389,16 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
}
|
||||
|
||||
timeline
|
||||
.check_checkpoint_distance()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
{
|
||||
// This is a hack. It piggybacks on the keepalive messages sent by the
|
||||
// safekeeper in order to enforce `checkpoint_timeout` on the currently
|
||||
// open layer. This hack doesn't provide a bound on the total size of
|
||||
// in-memory layers on a pageserver. See https://github.com/neondatabase/neon/issues/6916.
|
||||
let mut writer = timeline.writer().await;
|
||||
if let Err(err) = writer.tick().await {
|
||||
warn!("Timeline writer tick failed: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(last_lsn) = status_update {
|
||||
let timeline_remote_consistent_lsn = timeline
|
||||
|
||||
@@ -17,10 +17,10 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"gc_period": "0s",
|
||||
"checkpoint_distance": "8192",
|
||||
"checkpoint_distance": "16384",
|
||||
"compaction_period": "1 s",
|
||||
"compaction_threshold": "1",
|
||||
"compaction_target_size": "8192",
|
||||
"compaction_target_size": "16384",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user