mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
pageserver: track total ephemeral layer bytes (#7182)
## Problem Large quantities of ephemeral layer data can lead to excessive memory consumption (https://github.com/neondatabase/neon/issues/6939). We currently don't have a way to know how much ephemeral layer data is present on a pageserver. Before we can add new behaviors to proactively roll layers in response to too much ephemeral data, we must calculate that total. Related: https://github.com/neondatabase/neon/issues/6916 ## Summary of changes - Create GlobalResources and GlobalResourceUnits types, where timelines carry a GlobalResourceUnits in their TimelineWriterState. - Periodically update the size in GlobalResourceUnits: - During tick() - During layer roll - During put() if the latest value has drifted more than 10MB since our last update - Expose the value of the global ephemeral layer bytes counter as a prometheus metric. - Extend the lifetime of TimelineWriterState: - Instead of dropping it in TimelineWriter::drop, let it remain. - Drop TimelineWriterState in roll_layer: this drops our guard on the global byte count to reflect the fact that we're freezing the layer. - Ensure the validity of the later in the writer state by clearing the state in the same place we freeze layers, and asserting on the write-ability of the layer in `writer()` - Add a 'context' parameter to `get_open_layer_action` so that it can skip the prev_lsn==lsn check when called in tick() -- this is needed because now tick is called with a populated state, where prev_lsn==Some(lsn) is true for an idle timeline. - Extend layer rolling test to use this metric
This commit is contained in:
@@ -699,6 +699,14 @@ pub static STARTUP_IS_LOADING: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
.expect("Failed to register pageserver_startup_is_loading")
|
||||
});
|
||||
|
||||
pub(crate) static TIMELINE_EPHEMERAL_BYTES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_timeline_ephemeral_bytes",
|
||||
"Total number of bytes in ephemeral layers, summed for all timelines. Approximate, lazily updated."
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
/// Metrics related to the lifecycle of a [`crate::tenant::Tenant`] object: things
|
||||
/// like how long it took to load.
|
||||
///
|
||||
|
||||
@@ -23,8 +23,12 @@ use tracing::*;
|
||||
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
|
||||
// avoid binding to Write (conflicts with std::io::Write)
|
||||
// while being able to use std::fmt::Write's methods
|
||||
use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt::Write as _;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
use super::{
|
||||
@@ -70,6 +74,8 @@ pub struct InMemoryLayerInner {
|
||||
/// Each serialized Value is preceded by a 'u32' length field.
|
||||
/// PerSeg::page_versions map stores offsets into this file.
|
||||
file: EphemeralFile,
|
||||
|
||||
resource_units: GlobalResourceUnits,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for InMemoryLayerInner {
|
||||
@@ -78,6 +84,101 @@ impl std::fmt::Debug for InMemoryLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
/// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
|
||||
/// to minimize contention.
|
||||
///
|
||||
/// This global state is used to implement behaviors that require a global view of the system, e.g.
|
||||
/// rolling layers proactively to limit the total amount of dirty data.
|
||||
struct GlobalResources {
|
||||
// How many bytes are in all EphemeralFile objects
|
||||
dirty_bytes: AtomicU64,
|
||||
// How many layers are contributing to dirty_bytes
|
||||
dirty_layers: AtomicUsize,
|
||||
}
|
||||
|
||||
// Per-timeline RAII struct for its contribution to [`GlobalResources`]
|
||||
struct GlobalResourceUnits {
|
||||
// How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
|
||||
// for decrementing the global counter by this many bytes when dropped.
|
||||
dirty_bytes: u64,
|
||||
}
|
||||
|
||||
impl GlobalResourceUnits {
|
||||
// Hint for the layer append path to update us when the layer size differs from the last
|
||||
// call to update_size by this much. If we don't reach this threshold, we'll still get
|
||||
// updated when the Timeline "ticks" in the background.
|
||||
const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
|
||||
|
||||
fn new() -> Self {
|
||||
GLOBAL_RESOURCES
|
||||
.dirty_layers
|
||||
.fetch_add(1, AtomicOrdering::Relaxed);
|
||||
Self { dirty_bytes: 0 }
|
||||
}
|
||||
|
||||
/// Do not call this frequently: all timelines will write to these same global atomics,
|
||||
/// so this is a relatively expensive operation. Wait at least a few seconds between calls.
|
||||
fn publish_size(&mut self, size: u64) {
|
||||
let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
|
||||
Ordering::Equal => {
|
||||
return;
|
||||
}
|
||||
Ordering::Greater => {
|
||||
let delta = size - self.dirty_bytes;
|
||||
let old = GLOBAL_RESOURCES
|
||||
.dirty_bytes
|
||||
.fetch_add(delta, AtomicOrdering::Relaxed);
|
||||
old + delta
|
||||
}
|
||||
Ordering::Less => {
|
||||
let delta = self.dirty_bytes - size;
|
||||
let old = GLOBAL_RESOURCES
|
||||
.dirty_bytes
|
||||
.fetch_sub(delta, AtomicOrdering::Relaxed);
|
||||
old - delta
|
||||
}
|
||||
};
|
||||
|
||||
// This is a sloppy update: concurrent updates to the counter will race, and the exact
|
||||
// value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
|
||||
// That's okay: as long as the metric contains some recent value, it doesn't have to always
|
||||
// be literally the last update.
|
||||
TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
|
||||
|
||||
self.dirty_bytes = size;
|
||||
}
|
||||
|
||||
// Call publish_size if the input size differs from last published size by more than
|
||||
// the drift limit
|
||||
fn maybe_publish_size(&mut self, size: u64) {
|
||||
let publish = match size.cmp(&self.dirty_bytes) {
|
||||
Ordering::Equal => false,
|
||||
Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
|
||||
Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
|
||||
};
|
||||
|
||||
if publish {
|
||||
self.publish_size(size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for GlobalResourceUnits {
|
||||
fn drop(&mut self) {
|
||||
GLOBAL_RESOURCES
|
||||
.dirty_layers
|
||||
.fetch_sub(1, AtomicOrdering::Relaxed);
|
||||
|
||||
// Subtract our contribution to the global total dirty bytes
|
||||
self.publish_size(0);
|
||||
}
|
||||
}
|
||||
|
||||
static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
|
||||
dirty_bytes: AtomicU64::new(0),
|
||||
dirty_layers: AtomicUsize::new(0),
|
||||
};
|
||||
|
||||
impl InMemoryLayer {
|
||||
pub(crate) fn get_timeline_id(&self) -> TimelineId {
|
||||
self.timeline_id
|
||||
@@ -328,6 +429,7 @@ impl InMemoryLayer {
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
index: HashMap::new(),
|
||||
file,
|
||||
resource_units: GlobalResourceUnits::new(),
|
||||
}),
|
||||
})
|
||||
}
|
||||
@@ -378,9 +480,18 @@ impl InMemoryLayer {
|
||||
warn!("Key {} at {} already exists", key, lsn);
|
||||
}
|
||||
|
||||
let size = locked_inner.file.len();
|
||||
locked_inner.resource_units.maybe_publish_size(size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn tick(&self) {
|
||||
let mut inner = self.inner.write().await;
|
||||
let size = inner.file.len();
|
||||
inner.resource_units.publish_size(size);
|
||||
}
|
||||
|
||||
pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
|
||||
// TODO: Currently, we just leak the storage for any deleted keys
|
||||
Ok(())
|
||||
|
||||
@@ -4465,6 +4465,9 @@ impl<'a> TimelineWriter<'a> {
|
||||
let action = self.get_open_layer_action(last_record_lsn, 0);
|
||||
if action == OpenLayerAction::Roll {
|
||||
self.roll_layer(last_record_lsn).await?;
|
||||
} else if let Some(writer_state) = &mut *self.write_guard {
|
||||
// Periodic update of statistics
|
||||
writer_state.open_layer.tick().await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -62,9 +62,7 @@ def wait_for_upload(
|
||||
)
|
||||
time.sleep(1)
|
||||
raise Exception(
|
||||
"timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
|
||||
lsn, current_lsn
|
||||
)
|
||||
f"timed out while waiting for {tenant}/{timeline} remote_consistent_lsn to reach {lsn}, was {current_lsn}"
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Tuple
|
||||
|
||||
import pytest
|
||||
@@ -10,7 +9,7 @@ from fixtures.neon_fixtures import (
|
||||
tenant_get_shards,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
@@ -61,6 +60,15 @@ def wait_until_pageserver_is_caught_up(
|
||||
assert waited >= last_flush_lsn
|
||||
|
||||
|
||||
def wait_until_pageserver_has_uploaded(
|
||||
env: NeonEnv, last_flush_lsns: list[Tuple[TenantId, TimelineId, Lsn]]
|
||||
):
|
||||
for tenant, timeline, last_flush_lsn in last_flush_lsns:
|
||||
shards = tenant_get_shards(env, tenant)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
wait_for_upload(pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn)
|
||||
|
||||
|
||||
def wait_for_wal_ingest_metric(pageserver_http: PageserverHttpClient) -> float:
|
||||
def query():
|
||||
value = pageserver_http.get_metric_value("pageserver_wal_ingest_records_received_total")
|
||||
@@ -86,25 +94,50 @@ def test_pageserver_small_inmemory_layers(
|
||||
The workload creates a number of timelines and writes some data to each,
|
||||
but not enough to trigger flushes via the `checkpoint_distance` config.
|
||||
"""
|
||||
|
||||
def get_dirty_bytes():
|
||||
v = (
|
||||
env.pageserver.http_client().get_metric_value("pageserver_timeline_ephemeral_bytes")
|
||||
or 0
|
||||
)
|
||||
log.info(f"dirty_bytes: {v}")
|
||||
return v
|
||||
|
||||
def assert_dirty_bytes(v):
|
||||
assert get_dirty_bytes() == v
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
last_flush_lsns = asyncio.run(workload(env, TIMELINE_COUNT, ENTRIES_PER_TIMELINE))
|
||||
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
|
||||
|
||||
# We didn't write enough data to trigger a size-based checkpoint
|
||||
assert get_dirty_bytes() > 0
|
||||
|
||||
ps_http_client = env.pageserver.http_client()
|
||||
total_wal_ingested_before_restart = wait_for_wal_ingest_metric(ps_http_client)
|
||||
|
||||
log.info("Sleeping for checkpoint timeout ...")
|
||||
time.sleep(CHECKPOINT_TIMEOUT_SECONDS + 5)
|
||||
# Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed,
|
||||
# such that there are zero bytes of ephemeral layer left on the pageserver
|
||||
log.info("Waiting for background checkpoints...")
|
||||
wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(0)) # type: ignore
|
||||
|
||||
# Zero ephemeral layer bytes does not imply that all the frozen layers were uploaded: they
|
||||
# must be uploaded to remain visible to the pageserver after restart.
|
||||
wait_until_pageserver_has_uploaded(env, last_flush_lsns)
|
||||
|
||||
env.pageserver.restart(immediate=immediate_shutdown)
|
||||
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
|
||||
|
||||
# Catching up with WAL ingest should have resulted in zero bytes of ephemeral layers, since
|
||||
# we froze, flushed and uploaded everything before restarting. There can be no more WAL writes
|
||||
# because we shut down compute endpoints before flushing.
|
||||
assert get_dirty_bytes() == 0
|
||||
|
||||
total_wal_ingested_after_restart = wait_for_wal_ingest_metric(ps_http_client)
|
||||
|
||||
log.info(f"WAL ingested before restart: {total_wal_ingested_before_restart}")
|
||||
log.info(f"WAL ingested after restart: {total_wal_ingested_after_restart}")
|
||||
|
||||
leeway = total_wal_ingested_before_restart * 5 / 100
|
||||
assert total_wal_ingested_after_restart <= leeway
|
||||
assert total_wal_ingested_after_restart == 0
|
||||
|
||||
Reference in New Issue
Block a user