Revert "pageserver: roll open layer in timeline writer (#6661)"

This reverts commit 587cb705b8.

Conflicts:
	pageserver/src/tenant.rs
	pageserver/src/tenant/timeline.rs

The conflicts were with
* pageserver: adjust checkpoint distance for sharded tenants (#6852)
* pageserver: add vectored get implementation (#6576)
This commit is contained in:
Christian Schwarz
2024-02-27 18:10:39 +01:00
parent aea3861167
commit 6b98dd3fb4
6 changed files with 178 additions and 265 deletions

View File

@@ -15,7 +15,6 @@ use crate::walrecord::NeonWalRecord;
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::{
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
@@ -1493,7 +1492,7 @@ impl<'a> DatadirModification<'a> {
return Ok(());
}
let mut writer = self.tline.writer().await;
let writer = self.tline.writer().await;
// Flush relation and SLRU data blocks, keep metadata.
let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
@@ -1532,23 +1531,13 @@ impl<'a> DatadirModification<'a> {
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
let mut writer = self.tline.writer().await;
let writer = self.tline.writer().await;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
if !self.pending_updates.is_empty() {
let prev_pending_updates = std::mem::take(&mut self.pending_updates);
// The put_batch call below expects expects the inputs to be sorted by Lsn,
// so we do that first.
let lsn_ordered_batch: Vec<(Key, Lsn, Value)> = prev_pending_updates
.into_iter()
.map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (key, lsn, val)))
.kmerge_by(|lhs, rhs| lhs.1 .0 < rhs.1 .0)
.collect();
writer.put_batch(lsn_ordered_batch, ctx).await?;
writer.put_batch(&self.pending_updates, ctx).await?;
self.pending_updates.clear();
}

View File

@@ -3853,7 +3853,7 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
@@ -3865,7 +3865,7 @@ mod tests {
writer.finish_write(Lsn(0x10));
drop(writer);
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
@@ -3931,7 +3931,7 @@ mod tests {
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let mut writer = tline.writer().await;
let writer = tline.writer().await;
#[allow(non_snake_case)]
let TEST_KEY_A: Key = Key::from_hex("110000000033333333444444445500000001").unwrap();
@@ -3965,7 +3965,7 @@ mod tests {
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
let mut new_writer = newtline.writer().await;
let new_writer = newtline.writer().await;
new_writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
.await?;
@@ -3997,7 +3997,7 @@ mod tests {
) -> anyhow::Result<()> {
let mut lsn = start_lsn;
{
let mut writer = tline.writer().await;
let writer = tline.writer().await;
// Create a relation on the timeline
writer
.put(
@@ -4022,7 +4022,7 @@ mod tests {
}
tline.freeze_and_flush().await?;
{
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
@@ -4385,7 +4385,7 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
@@ -4402,7 +4402,7 @@ mod tests {
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
@@ -4419,7 +4419,7 @@ mod tests {
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
@@ -4436,7 +4436,7 @@ mod tests {
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
@@ -4493,7 +4493,7 @@ mod tests {
for _ in 0..repeat {
for _ in 0..key_count {
test_key.field6 = blknum;
let mut writer = timeline.writer().await;
let writer = timeline.writer().await;
writer
.put(
test_key,
@@ -4664,7 +4664,7 @@ mod tests {
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
test_key,
@@ -4685,7 +4685,7 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
test_key,
@@ -4753,7 +4753,7 @@ mod tests {
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
test_key,
@@ -4782,7 +4782,7 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
test_key,
@@ -4859,7 +4859,7 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let mut writer = tline.writer().await;
let writer = tline.writer().await;
writer
.put(
test_key,

View File

@@ -336,17 +336,32 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub(crate) async fn put_value(
&self,
key: Key,
lsn: Lsn,
buf: &[u8],
val: &Value,
ctx: &RequestContext,
) -> Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
self.put_value_locked(&mut inner, key, lsn, val, ctx).await
}
pub(crate) async fn put_values(
&self,
values: &HashMap<Key, Vec<(Lsn, Value)>>,
ctx: &RequestContext,
) -> Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
for (key, vals) in values {
for (lsn, val) in vals {
self.put_value_locked(&mut inner, *key, *lsn, val, ctx)
.await?;
}
}
Ok(())
}
async fn put_value_locked(
@@ -354,16 +369,22 @@ impl InMemoryLayer {
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
key: Key,
lsn: Lsn,
buf: &[u8],
val: &Value,
ctx: &RequestContext,
) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
let off = {
// Avoid doing allocations for "small" values.
// In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
// https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
buf.clear();
val.ser_into(&mut buf)?;
locked_inner
.file
.write_blob(
buf,
&buf,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build(),
@@ -391,12 +412,7 @@ impl InMemoryLayer {
pub async fn freeze(&self, end_lsn: Lsn) {
let inner = self.inner.write().await;
assert!(
self.start_lsn < end_lsn,
"{} >= {}",
self.start_lsn,
end_lsn
);
assert!(self.start_lsn < end_lsn);
self.end_lsn.set(end_lsn).expect("end_lsn set only once");
for vec_map in inner.index.values() {

View File

@@ -26,15 +26,6 @@ use pageserver_api::{
};
use rand::Rng;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::{
runtime::Handle,
sync::{oneshot, watch},
};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{bin_ser::BeSer, sync::gate::Gate};
use std::ops::{Deref, Range};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
@@ -49,6 +40,14 @@ use std::{
cmp::{max, min, Ordering},
ops::ControlFlow,
};
use storage_broker::BrokerClientChannel;
use tokio::{
runtime::Handle,
sync::{oneshot, watch},
};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::sync::gate::Gate;
use crate::pgdatadir_mapping::DirectoryKind;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
@@ -274,7 +273,7 @@ pub struct Timeline {
/// Locked automatically by [`TimelineWriter`] and checkpointer.
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
write_lock: tokio::sync::Mutex<Option<TimelineWriterState>>,
write_lock: tokio::sync::Mutex<()>,
/// Used to avoid multiple `flush_loop` tasks running
pub(super) flush_loop_state: Mutex<FlushLoopState>,
@@ -1204,10 +1203,58 @@ impl Timeline {
pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
TimelineWriter {
tl: self,
write_guard: self.write_lock.lock().await,
_write_guard: self.write_lock.lock().await,
}
}
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let open_layer_size = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let Some(open_layer) = layers.open_layer.as_ref() else {
return Ok(());
};
open_layer.size().await?
};
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Rolling the open layer can be triggered by:
// 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
// the safekeepers need to store. For sharded tenants, we multiply by shard count to
// account for how writes are distributed across shards: we expect each node to consume
// 1/count of the LSN on average.
// 2. The size of the currently open layer.
// 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
// up and suspend activity.
if (distance
>= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128)
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true).await;
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
// Wake up the layer flusher
self.flush_frozen_layers();
}
Ok(())
}
pub(crate) fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
@@ -1639,7 +1686,7 @@ impl Timeline {
layer_flush_start_tx,
layer_flush_done_tx,
write_lock: tokio::sync::Mutex::new(None),
write_lock: tokio::sync::Mutex::new(()),
gc_info: std::sync::RwLock::new(GcInfo {
retain_lsns: Vec::new(),
@@ -2980,6 +3027,43 @@ impl Timeline {
Ok(layer)
}
async fn put_value(
&self,
key: Key,
lsn: Lsn,
val: &Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write(lsn).await?;
layer.put_value(key, lsn, val, ctx).await?;
Ok(())
}
async fn put_values(
&self,
values: &HashMap<Key, Vec<(Lsn, Value)>>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Pick the first LSN in the batch to get the layer to write to.
for lsns in values.values() {
if let Some((lsn, _)) = lsns.first() {
let layer = self.get_layer_for_write(*lsn).await?;
layer.put_values(values, ctx).await?;
break;
}
}
Ok(())
}
async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
if let Some((_, lsn)) = tombstones.first() {
let layer = self.get_layer_for_write(*lsn).await?;
layer.put_tombstones(tombstones).await?;
}
Ok(())
}
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
assert!(new_lsn.is_aligned());
@@ -2990,20 +3074,14 @@ impl Timeline {
async fn freeze_inmem_layer(&self, write_lock_held: bool) {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let _write_guard = if write_lock_held {
None
} else {
Some(self.write_lock.lock().await)
};
self.freeze_inmem_layer_at(self.get_last_record_lsn()).await;
}
async fn freeze_inmem_layer_at(&self, at: Lsn) {
let mut guard = self.layers.write().await;
guard
.try_freeze_in_memory_layer(at, &self.last_freeze_at)
.try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
.await;
}
@@ -4982,43 +5060,13 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
PageReconstructError::from(msg)
}
struct TimelineWriterState {
open_layer: Arc<InMemoryLayer>,
current_size: u64,
// Previous Lsn which passed through
prev_lsn: Option<Lsn>,
// Largest Lsn which passed through the current writer
max_lsn: Option<Lsn>,
// Cached details of the last freeze. Avoids going trough the atomic/lock on every put.
cached_last_freeze_at: Lsn,
cached_last_freeze_ts: Instant,
}
impl TimelineWriterState {
fn new(
open_layer: Arc<InMemoryLayer>,
current_size: u64,
last_freeze_at: Lsn,
last_freeze_ts: Instant,
) -> Self {
Self {
open_layer,
current_size,
prev_lsn: None,
max_lsn: None,
cached_last_freeze_at: last_freeze_at,
cached_last_freeze_ts: last_freeze_ts,
}
}
}
/// Various functions to mutate the timeline.
// TODO Currently, Deref is used to allow easy access to read methods from this trait.
// This is probably considered a bad practice in Rust and should be fixed eventually,
// but will cause large code changes.
pub(crate) struct TimelineWriter<'a> {
tl: &'a Timeline,
write_guard: tokio::sync::MutexGuard<'a, Option<TimelineWriterState>>,
_write_guard: tokio::sync::MutexGuard<'a, ()>,
}
impl Deref for TimelineWriter<'_> {
@@ -5029,193 +5077,31 @@ impl Deref for TimelineWriter<'_> {
}
}
impl Drop for TimelineWriter<'_> {
fn drop(&mut self) {
self.write_guard.take();
}
}
enum OpenLayerAction {
Roll,
Open,
None,
}
impl<'a> TimelineWriter<'a> {
/// Put a new page version that can be constructed from a WAL record
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub(crate) async fn put(
&mut self,
&self,
key: Key,
lsn: Lsn,
value: &Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Avoid doing allocations for "small" values.
// In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
// https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
buf.clear();
value.ser_into(&mut buf)?;
let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
let action = self.get_open_layer_action(lsn, buf_size);
let layer = self.handle_open_layer_action(lsn, action).await?;
let res = layer.put_value(key, lsn, &buf, ctx).await;
if res.is_ok() {
// Update the current size only when the entire write was ok.
// In case of failures, we may have had partial writes which
// render the size tracking out of sync. That's ok because
// the checkpoint distance should be significantly smaller
// than the S3 single shot upload limit of 5GiB.
let state = self.write_guard.as_mut().unwrap();
state.current_size += buf_size;
state.prev_lsn = Some(lsn);
state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn));
}
res
self.tl.put_value(key, lsn, value, ctx).await
}
async fn handle_open_layer_action(
&mut self,
at: Lsn,
action: OpenLayerAction,
) -> anyhow::Result<&Arc<InMemoryLayer>> {
match action {
OpenLayerAction::Roll => {
let max_lsn = self.write_guard.as_ref().unwrap().max_lsn.unwrap();
self.tl.freeze_inmem_layer_at(max_lsn).await;
let now = Instant::now();
*(self.last_freeze_ts.write().unwrap()) = now;
self.tl.flush_frozen_layers();
let current_size = self.write_guard.as_ref().unwrap().current_size;
if current_size > self.get_checkpoint_distance() {
warn!("Flushed oversized open layer with size {}", current_size)
}
assert!(self.write_guard.is_some());
let layer = self.tl.get_layer_for_write(at).await?;
let initial_size = layer.size().await?;
self.write_guard.replace(TimelineWriterState::new(
layer,
initial_size,
Lsn(max_lsn.0 + 1),
now,
));
}
OpenLayerAction::Open => {
assert!(self.write_guard.is_none());
let layer = self.tl.get_layer_for_write(at).await?;
let initial_size = layer.size().await?;
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *self.last_freeze_ts.read().unwrap();
self.write_guard.replace(TimelineWriterState::new(
layer,
initial_size,
last_freeze_at,
last_freeze_ts,
));
}
OpenLayerAction::None => {
assert!(self.write_guard.is_some());
}
}
Ok(&self.write_guard.as_ref().unwrap().open_layer)
}
fn get_open_layer_action(&self, lsn: Lsn, new_value_size: u64) -> OpenLayerAction {
let state = &*self.write_guard;
let Some(state) = &state else {
return OpenLayerAction::Open;
};
if state.prev_lsn == Some(lsn) {
// Rolling mid LSN is not supported by downstream code.
// Hence, only roll at LSN boundaries.
return OpenLayerAction::None;
}
let distance = lsn.widening_sub(state.cached_last_freeze_at);
let proposed_open_layer_size = state.current_size + new_value_size;
// Rolling the open layer can be triggered by:
// 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
// the safekeepers need to store. For sharded tenants, we multiply by shard count to
// account for how writes are distributed across shards: we expect each node to consume
// 1/count of the LSN on average.
// 2. The size of the currently open layer.
// 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
// up and suspend activity.
if distance
>= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128
{
info!(
"Will roll layer at {} with layer size {} due to LSN distance ({})",
lsn, state.current_size, distance
);
OpenLayerAction::Roll
} else if state.current_size > 0
&& proposed_open_layer_size >= self.get_checkpoint_distance()
{
info!(
"Will roll layer at {} with layer size {} due to layer size ({})",
lsn, state.current_size, proposed_open_layer_size
);
OpenLayerAction::Roll
} else if distance > 0
&& state.cached_last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()
{
info!(
"Will roll layer at {} with layer size {} due to time since last flush ({:?})",
lsn,
state.current_size,
state.cached_last_freeze_ts.elapsed()
);
OpenLayerAction::Roll
} else {
OpenLayerAction::None
}
}
/// Put a batch keys at the specified Lsns.
///
/// The batch should be sorted by Lsn such that it's safe
/// to roll the open layer mid batch.
pub(crate) async fn put_batch(
&mut self,
batch: Vec<(Key, Lsn, Value)>,
&self,
batch: &HashMap<Key, Vec<(Lsn, Value)>>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
for (key, lsn, val) in batch {
self.put(key, lsn, &val, ctx).await?
}
Ok(())
self.tl.put_values(batch, ctx).await
}
pub(crate) async fn delete_batch(&mut self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
if let Some((_, lsn)) = batch.first() {
let action = self.get_open_layer_action(*lsn, 0);
let layer = self.handle_open_layer_action(*lsn, action).await?;
layer.put_tombstones(batch).await?;
}
Ok(())
pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
self.tl.put_tombstones(batch).await
}
/// Track the end of the latest digested WAL record.

View File

@@ -343,6 +343,23 @@ pub(super) async fn handle_walreceiver_connection(
modification.commit(&ctx).await?;
uncommitted_records = 0;
filtered_records = 0;
//
// We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise
// layer size can become much larger than `checkpoint_distance`.
// It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large
// amount of data to key-value storage. So performing this check only after processing
// all WAL records in the chunk, can cause huge L0 layer files.
//
timeline
.check_checkpoint_distance()
.await
.with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
}
}
@@ -389,6 +406,16 @@ pub(super) async fn handle_walreceiver_connection(
}
}
timeline
.check_checkpoint_distance()
.await
.with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
if let Some(last_lsn) = status_update {
let timeline_remote_consistent_lsn = timeline
.get_remote_consistent_lsn_visible()

View File

@@ -82,11 +82,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
# During shutdown, DownloadError::Cancelled may be logged as an error. Cleaning this
# up is tracked in https://github.com/neondatabase/neon/issues/6096
".*Cancelled, shutting down.*",
# Open layers are only rolled at Lsn boundaries to avoid name clashses.
# Hence, we can overshoot the soft limit set by checkpoint distance.
# This is especially pronounced in tests that set small checkpoint
# distances.
".*Flushed oversized open layer with size.*",
)