Compare commits

..

4 Commits

Author SHA1 Message Date
Alex Chi Z
a40eee31a7 refactor(pageserver): better k-merge implementation for tiered compaction
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-20 17:13:40 -04:00
Alex Chi Z
6810d2aa53 feat(pageserver): do not read past image layers for vectored get (#7773)
## Problem

Part of https://github.com/neondatabase/neon/issues/7462

On metadata keyspace, vectored get will not stop if a key is not found,
and will read past the image layer. However, the semantics is different
from single get, because if a key does not exist in the image layer, it
means that the key does not exist in the past, or have been deleted.
This pull request fixed it by recording image layer coverage during the
vectored get process and stop when the full keyspace is covered by an
image layer. A corresponding test case is added to ensure generating
image layer reduces the number of delta layers.

This optimization (or bug fix) also applies to rel block keyspaces. If a
key is missing, we can know it's missing once the first image layer is
reached. Page server will not attempt to read lower layers, which
potentially incurs layer downloads + evictions.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-20 14:24:18 -04:00
Andy Hattemer
2d7091871f Update banner image in Readme (#7801)
Update the readme banner with updated branding.
2024-05-20 12:15:43 -04:00
Alex Chi Z
7701ca45dd feat(pageserver): generate image layers for sparse keyspace (#7567)
Part of https://github.com/neondatabase/neon/issues/7462

Sparse keyspace does not generate image layers for now. This pull
request adds support for generating image layers for sparse keyspace.


## Summary of changes

* Use the scan interface to generate compaction data for sparse
keyspace.
* Track num of delta layers reads during scan.
* Read-trigger compaction: when a scan on the keyspace touches too many
delta files, generate an image layer. There are one hard-coded threshold
for now: max delta layers we want to touch for a scan.
* L0 compaction does not need to compute holes for metadata keyspace.

Know issue: the scan interface currently reads past the image layer,
which causes `delta_layer_accessed` keeps increasing even if image
layers are generated. The pull request to fix that will be separate, and
orthogonal to this one.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-20 16:08:45 +00:00
24 changed files with 1447 additions and 875 deletions

View File

@@ -1,4 +1,6 @@
[![Neon](https://user-images.githubusercontent.com/13738772/236813940-dcfdcb5b-69d3-449b-a686-013febe834d4.png)](https://neon.tech)
[![Neon](https://github.com/neondatabase/neon/assets/11527560/f15a17f0-836e-40c5-b35d-030606a6b660)](https://neon.tech)
# Neon

View File

@@ -307,7 +307,7 @@ impl KeySpace {
}
/// Merge another keyspace into the current one.
/// Note: the keyspaces must not ovelap (enforced via assertions)
/// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
pub fn merge(&mut self, other: &KeySpace) {
let all_ranges = self
.ranges

View File

@@ -6,14 +6,10 @@ use futures::future::BoxFuture;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use pageserver_api::shard::ShardIdentity;
use pin_project_lite::pin_project;
use std::collections::BinaryHeap;
use std::collections::VecDeque;
use std::collections::{binary_heap, BinaryHeap};
use std::fmt::Display;
use std::future::Future;
use std::ops::{DerefMut, Range};
use std::pin::Pin;
use std::task::{ready, Poll};
use std::ops::Range;
use utils::lsn::Lsn;
pub const PAGE_SZ: u64 = 8192;
@@ -85,33 +81,6 @@ pub fn intersect_keyspace<K: Ord + Clone + Copy>(
ranges
}
/// Create a stream that iterates through all DeltaEntrys among all input
/// layers, in key-lsn order.
///
/// This is public because the create_delta() implementation likely wants to use this too
/// TODO: move to a more shared place
pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
layers: &'a [E::DeltaLayer],
ctx: &'a E::RequestContext,
) -> MergeDeltaKeys<'a, E> {
// Use a binary heap to merge the layers. Each input layer is initially
// represented by a LazyLoadLayer::Unloaded element, which uses the start of
// the layer's key range as the key. The first time a layer reaches the top
// of the heap, all the keys of the layer are loaded into a sorted vector.
//
// This helps to keep the memory usage reasonable: we only need to hold in
// memory the DeltaEntrys of the layers that overlap with the "current" key.
let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
for l in layers {
heap.push(LazyLoadLayer::Unloaded(l));
}
MergeDeltaKeys {
heap,
ctx,
load_future: None,
}
}
pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
layers: &'a [E::DeltaLayer],
ctx: &'a E::RequestContext,
@@ -129,104 +98,139 @@ pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
Ok(stream)
}
enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
Unloaded(&'a E::DeltaLayer),
/// Wrapper type to make `dl.load_keys`` compile.
type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
pub enum LayerIterator<'a, E: CompactionJobExecutor> {
Loaded(
VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>,
&'a E::RequestContext,
),
Unloaded(&'a E::DeltaLayer, &'a E::RequestContext),
}
impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
fn min_key(&self) -> E::Key {
impl<'a, E: CompactionJobExecutor + 'a> LayerIterator<'a, E> {
pub fn new(delta_layer: &'a E::DeltaLayer, ctx: &'a E::RequestContext) -> Self {
Self::Unloaded(delta_layer, ctx)
}
pub fn key_lsn(&self) -> (E::Key, Lsn) {
match self {
Self::Loaded(entries) => entries.front().unwrap().key(),
Self::Unloaded(dl) => dl.key_range().start,
Self::Unloaded(dl, _) => (dl.key_range().start, dl.lsn_range().start),
Self::Loaded(entries, _) => entries.front().map(|x| (x.key(), x.lsn())).unwrap(),
}
}
fn min_lsn(&self) -> Lsn {
async fn load(&mut self) -> anyhow::Result<()> {
match self {
Self::Loaded(entries) => entries.front().unwrap().lsn(),
Self::Unloaded(dl) => dl.lsn_range().start,
Self::Unloaded(dl, ctx) => {
let unloaded_key_lsn = (dl.key_range().start, dl.lsn_range().start);
let fut: LoadFuture<
'a,
<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>,
> = Box::pin(dl.load_keys(ctx));
let keys = VecDeque::from(fut.await?);
assert_eq!(
keys.front().as_ref().map(|x| (x.key(), x.lsn())).unwrap(),
unloaded_key_lsn,
"unmatched start key_lsn"
);
*self = Self::Loaded(keys, ctx);
Ok(())
}
Self::Loaded(_, _) => Ok(()),
}
}
pub async fn entry(
&mut self,
) -> anyhow::Result<&<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
self.load().await?;
let Self::Loaded(x, _) = self else {
unreachable!()
};
Ok(x.front().unwrap())
}
pub async fn next(
&mut self,
) -> anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
self.load().await?; // requires Box::pin to make it compile
let Self::Loaded(x, _) = self else {
unreachable!()
};
Ok(x.pop_front().expect("already reached the end"))
}
pub fn is_end(&self) -> bool {
match self {
Self::Unloaded(_, _) => false,
Self::Loaded(x, _) => x.is_empty(),
}
}
}
impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
impl<'a, E: CompactionJobExecutor + 'a> PartialOrd for LayerIterator<'a, E> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
impl<'a, E: CompactionJobExecutor + 'a> Ord for LayerIterator<'a, E> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// reverse order so that we get a min-heap
(other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
// reverse comparison to get a min-heap
other.key_lsn().cmp(&self.key_lsn())
}
}
impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
impl<'a, E: CompactionJobExecutor + 'a> PartialEq for LayerIterator<'a, E> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == std::cmp::Ordering::Equal
}
}
impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}
type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
impl<'a, E: CompactionJobExecutor + 'a> Eq for LayerIterator<'a, E> {}
// Stream returned by `merge_delta_keys`
pin_project! {
#[allow(clippy::type_complexity)]
pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
heap: BinaryHeap<LazyLoadLayer<'a, E>>,
#[pin]
load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
ctx: &'a E::RequestContext,
}
pub struct DeltaMergeIterator<'a, E: CompactionJobExecutor> {
heap: BinaryHeap<LayerIterator<'a, E>>,
}
impl<'a, E> Stream for MergeDeltaKeys<'a, E>
where
E: CompactionJobExecutor + 'a,
{
type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
impl<'a, E: CompactionJobExecutor + 'a> DeltaMergeIterator<'a, E> {
pub fn new(delta_layers: &'a [E::DeltaLayer], ctx: &'a E::RequestContext) -> Self {
let mut heap = BinaryHeap::new();
for dl in delta_layers {
heap.push(LayerIterator::new(dl, ctx));
}
Self { heap }
}
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
let mut this = self.project();
loop {
if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
// We are waiting for loading the keys to finish
match ready!(load_future.as_mut().poll(cx)) {
Ok(entries) => {
this.load_future.set(None);
*this.heap.peek_mut().unwrap() =
LazyLoadLayer::Loaded(VecDeque::from(entries));
}
Err(e) => {
return Poll::Ready(Some(Err(e)));
}
pub fn is_end(&self) -> bool {
self.heap.is_empty()
}
/// The next key-lsn entry that will be returned by `next`.
pub fn key_lsn(&self) -> (E::Key, Lsn) {
self.heap.peek().expect("already reached the end").key_lsn()
}
/// Move to the next entry and return the current entry.
pub async fn next(
&mut self,
) -> anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
let Some(mut top) = self.heap.peek_mut() else {
panic!("already reached the end")
};
match top.next().await {
Ok(entry) => {
if top.is_end() {
binary_heap::PeekMut::pop(top);
}
Ok(entry)
}
// If the topmost layer in the heap hasn't been loaded yet, start
// loading it. Otherwise return the next entry from it and update
// the layer's position in the heap (this decreaseKey operation is
// performed implicitly when `top` is dropped).
if let Some(mut top) = this.heap.peek_mut() {
match top.deref_mut() {
LazyLoadLayer::Unloaded(ref mut l) => {
let fut = l.load_keys(this.ctx);
this.load_future.set(Some(Box::pin(fut)));
continue;
}
LazyLoadLayer::Loaded(ref mut entries) => {
let result = entries.pop_front().unwrap();
if entries.is_empty() {
std::collections::binary_heap::PeekMut::pop(top);
}
return Poll::Ready(Some(Ok(result)));
}
}
} else {
return Poll::Ready(None);
Err(e) => {
// pop the item if there is an error, otherwise it might cause further panic when binary heap compares it after `PeekMut` gets dropped.
binary_heap::PeekMut::pop(top);
Err(e)
}
}
}

View File

@@ -92,7 +92,9 @@ pub trait CompactionJobExecutor {
) -> impl Future<Output = anyhow::Result<()>> + Send;
}
pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display {
pub trait CompactionKey:
std::cmp::Ord + Clone + Copy + std::fmt::Display + std::fmt::Debug
{
const MIN: Self;
const MAX: Self;

View File

@@ -2,7 +2,6 @@ mod draw;
use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};
use futures::StreamExt;
use pageserver_api::shard::ShardIdentity;
use rand::Rng;
use tracing::info;
@@ -15,7 +14,8 @@ use std::sync::Arc;
use std::sync::Mutex;
use crate::helpers::PAGE_SZ;
use crate::helpers::{merge_delta_keys, overlaps_with};
use crate::helpers::overlaps_with;
use crate::helpers::DeltaMergeIterator;
use crate::interface;
use crate::interface::CompactionLayer;
@@ -545,12 +545,11 @@ impl interface::CompactionJobExecutor for MockTimeline {
input_layers: &[Arc<MockDeltaLayer>],
ctx: &MockRequestContext,
) -> anyhow::Result<()> {
let mut key_value_stream =
std::pin::pin!(merge_delta_keys::<MockTimeline>(input_layers, ctx));
let mut key_value_stream = DeltaMergeIterator::<MockTimeline>::new(input_layers, ctx);
let mut records: Vec<MockRecord> = Vec::new();
let mut total_len = 2;
while let Some(delta_entry) = key_value_stream.next().await {
let delta_entry: MockRecord = delta_entry?;
while !key_value_stream.is_end() {
let delta_entry: MockRecord = key_value_stream.next().await?;
if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) {
total_len += delta_entry.len;
records.push(delta_entry);

View File

@@ -40,7 +40,11 @@ use utils::bin_ser::DeserializeError;
use utils::vec_map::{VecMap, VecMapOrdering};
use utils::{bin_ser::BeSer, lsn::Lsn};
const MAX_AUX_FILE_DELTAS: usize = 1024;
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
/// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
#[derive(Debug)]
pub enum LsnForTimestamp {

View File

@@ -3968,7 +3968,7 @@ mod tests {
use crate::tenant::harness::*;
use crate::tenant::timeline::CompactFlags;
use crate::DEFAULT_PG_VERSION;
use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use hex_literal::hex;
use pageserver_api::key::{AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::keyspace::KeySpace;
@@ -4777,7 +4777,12 @@ mod tests {
info!("Doing vectored read on {:?}", read);
let vectored_res = tline
.get_vectored_impl(read.clone(), reads_lsn, ValuesReconstructState::new(), &ctx)
.get_vectored_impl(
read.clone(),
reads_lsn,
&mut ValuesReconstructState::new(),
&ctx,
)
.await;
tline
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
@@ -4826,7 +4831,7 @@ mod tests {
.get_vectored_impl(
aux_keyspace.clone(),
read_lsn,
ValuesReconstructState::new(),
&mut ValuesReconstructState::new(),
&ctx,
)
.await;
@@ -4971,7 +4976,7 @@ mod tests {
.get_vectored_impl(
read.clone(),
current_lsn,
ValuesReconstructState::new(),
&mut ValuesReconstructState::new(),
&ctx,
)
.await?;
@@ -5106,7 +5111,7 @@ mod tests {
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
},
query_lsn,
ValuesReconstructState::new(),
&mut ValuesReconstructState::new(),
&ctx,
)
.await;
@@ -5547,7 +5552,7 @@ mod tests {
.await?;
const NUM_KEYS: usize = 1000;
const STEP: usize = 100; // random update + scan base_key + idx * STEP
const STEP: usize = 10000; // random update + scan base_key + idx * STEP
let cancel = CancellationToken::new();
@@ -5580,7 +5585,7 @@ mod tests {
let keyspace = KeySpace::single(base_key..base_key.add((NUM_KEYS * STEP) as u32));
for _ in 0..10 {
for iter in 0..=10 {
// Read all the blocks
for (blknum, last_lsn) in updated.iter().enumerate() {
test_key.field6 = (blknum * STEP) as u32;
@@ -5595,7 +5600,7 @@ mod tests {
.get_vectored_impl(
keyspace.clone(),
lsn,
ValuesReconstructState::default(),
&mut ValuesReconstructState::default(),
&ctx,
)
.await?
@@ -5631,17 +5636,91 @@ mod tests {
updated[blknum] = lsn;
}
// Perform a cycle of flush, compact, and GC
tline.freeze_and_flush().await?;
tline.compact(&cancel, EnumSet::empty(), &ctx).await?;
tenant
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
// Perform two cycles of flush, compact, and GC
for round in 0..2 {
tline.freeze_and_flush().await?;
tline
.compact(
&cancel,
if iter % 5 == 0 && round == 0 {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags
} else {
EnumSet::empty()
},
&ctx,
)
.await?;
tenant
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
}
}
Ok(())
}
#[tokio::test]
async fn test_metadata_compaction_trigger() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_compaction_trigger")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let cancel = CancellationToken::new();
let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
base_key.field1 = AUX_KEY_PREFIX;
let test_key = base_key;
let mut lsn = Lsn(0x10);
for _ in 0..20 {
lsn = Lsn(lsn.0 + 0x10);
let mut writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", 0, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
tline.freeze_and_flush().await?; // force create a delta layer
}
let before_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()?
.len();
tline.compact(&cancel, EnumSet::empty(), &ctx).await?;
let after_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()?
.len();
assert!(after_num_l0_delta_files < before_num_l0_delta_files, "after_num_l0_delta_files={after_num_l0_delta_files}, before_num_l0_delta_files={before_num_l0_delta_files}");
assert_eq!(
tline.get(test_key, lsn, &ctx).await?,
test_img(&format!("{} at {}", 0, lsn))
);
Ok(())
}
#[tokio::test]
async fn test_branch_copies_dirty_aux_file_flag() {
let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag").unwrap();
@@ -5917,4 +5996,374 @@ mod tests {
Some(&bytes::Bytes::from_static(b"last"))
);
}
#[tokio::test]
async fn test_metadata_image_creation() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_image_creation")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
const NUM_KEYS: usize = 1000;
const STEP: usize = 10000; // random update + scan base_key + idx * STEP
let cancel = CancellationToken::new();
let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
base_key.field1 = AUX_KEY_PREFIX;
let mut test_key = base_key;
let mut lsn = Lsn(0x10);
async fn scan_with_statistics(
tline: &Timeline,
keyspace: &KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<(BTreeMap<Key, Result<Bytes, PageReconstructError>>, usize)> {
let mut reconstruct_state = ValuesReconstructState::default();
let res = tline
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await?;
Ok((res, reconstruct_state.get_delta_layers_visited() as usize))
}
#[allow(clippy::needless_range_loop)]
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = (blknum * STEP) as u32;
let mut writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
}
let keyspace = KeySpace::single(base_key..base_key.add((NUM_KEYS * STEP) as u32));
for iter in 1..=10 {
for _ in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = (blknum * STEP) as u32;
let mut writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
}
tline.freeze_and_flush().await?;
if iter % 5 == 0 {
let (_, before_delta_file_accessed) =
scan_with_statistics(&tline, &keyspace, lsn, &ctx).await?;
tline
.compact(
&cancel,
{
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags
},
&ctx,
)
.await?;
let (_, after_delta_file_accessed) =
scan_with_statistics(&tline, &keyspace, lsn, &ctx).await?;
assert!(after_delta_file_accessed < before_delta_file_accessed, "after_delta_file_accessed={after_delta_file_accessed}, before_delta_file_accessed={before_delta_file_accessed}");
// Given that we already produced an image layer, there should be no delta layer needed for the scan, but still setting a low threshold there for unforeseen circumstances.
assert!(
after_delta_file_accessed <= 2,
"after_delta_file_accessed={after_delta_file_accessed}"
);
}
}
Ok(())
}
#[tokio::test]
async fn test_vectored_missing_data_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_data_key_reads")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let cancel = CancellationToken::new();
let base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
let base_key_child = Key::from_hex("000000000033333333444444445500000001").unwrap();
let base_key_nonexist = Key::from_hex("000000000033333333444444445500000002").unwrap();
let mut lsn = Lsn(0x20);
{
let mut writer = tline.writer().await;
writer
.put(base_key, lsn, &Value::Image(test_img("data key 1")), &ctx)
.await?;
writer.finish_write(lsn);
drop(writer);
tline.freeze_and_flush().await?; // this will create a image layer
}
let child = tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(lsn), &ctx)
.await
.unwrap();
lsn.0 += 0x10;
{
let mut writer = child.writer().await;
writer
.put(
base_key_child,
lsn,
&Value::Image(test_img("data key 2")),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
child.freeze_and_flush().await?; // this will create a delta
{
// update the partitioning to include the test key space, otherwise they
// will be dropped by image layer creation
let mut guard = child.partitioning.lock().await;
let ((partitioning, _), partition_lsn) = &mut *guard;
partitioning
.parts
.push(KeySpace::single(base_key..base_key_nonexist)); // exclude the nonexist key
*partition_lsn = lsn;
}
child
.compact(
&cancel,
{
let mut set = EnumSet::empty();
set.insert(CompactFlags::ForceImageLayerCreation);
set
},
&ctx,
)
.await?; // force create an image layer for the keys, TODO: check if the image layer is created
}
async fn get_vectored_impl_wrapper(
tline: &Arc<Timeline>,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline
.get_vectored_impl(
KeySpace::single(key..key.next()),
lsn,
&mut reconstruct_state,
ctx,
)
.await?;
Ok(res.pop_last().map(|(k, v)| {
assert_eq!(k, key);
v.unwrap()
}))
}
// test vectored get on parent timeline
assert_eq!(
get_vectored_impl_wrapper(&tline, base_key, lsn, &ctx).await?,
Some(test_img("data key 1"))
);
assert!(get_vectored_impl_wrapper(&tline, base_key_child, lsn, &ctx)
.await
.unwrap_err()
.is_missing_key_error());
assert!(
get_vectored_impl_wrapper(&tline, base_key_nonexist, lsn, &ctx)
.await
.unwrap_err()
.is_missing_key_error()
);
// test vectored get on child timeline
assert_eq!(
get_vectored_impl_wrapper(&child, base_key, lsn, &ctx).await?,
Some(test_img("data key 1"))
);
assert_eq!(
get_vectored_impl_wrapper(&child, base_key_child, lsn, &ctx).await?,
Some(test_img("data key 2"))
);
assert!(
get_vectored_impl_wrapper(&child, base_key_nonexist, lsn, &ctx)
.await
.unwrap_err()
.is_missing_key_error()
);
Ok(())
}
#[tokio::test]
async fn test_vectored_missing_metadata_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let cancel = CancellationToken::new();
let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
let mut base_key_child = Key::from_hex("000000000033333333444444445500000001").unwrap();
let mut base_key_nonexist = Key::from_hex("000000000033333333444444445500000002").unwrap();
base_key.field1 = AUX_KEY_PREFIX;
base_key_child.field1 = AUX_KEY_PREFIX;
base_key_nonexist.field1 = AUX_KEY_PREFIX;
let mut lsn = Lsn(0x20);
{
let mut writer = tline.writer().await;
writer
.put(
base_key,
lsn,
&Value::Image(test_img("metadata key 1")),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
tline.freeze_and_flush().await?; // this will create an image layer
tline
.compact(
&cancel,
{
let mut set = EnumSet::empty();
set.insert(CompactFlags::ForceImageLayerCreation);
set.insert(CompactFlags::ForceRepartition);
set
},
&ctx,
)
.await?; // force create an image layer for metadata keys
tenant
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
}
let child = tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(lsn), &ctx)
.await
.unwrap();
lsn.0 += 0x10;
{
let mut writer = child.writer().await;
writer
.put(
base_key_child,
lsn,
&Value::Image(test_img("metadata key 2")),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
child.freeze_and_flush().await?;
child
.compact(
&cancel,
{
let mut set = EnumSet::empty();
set.insert(CompactFlags::ForceImageLayerCreation);
set.insert(CompactFlags::ForceRepartition);
set
},
&ctx,
)
.await?; // force create an image layer for metadata keys
tenant
.gc_iteration(Some(child.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
}
async fn get_vectored_impl_wrapper(
tline: &Arc<Timeline>,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline
.get_vectored_impl(
KeySpace::single(key..key.next()),
lsn,
&mut reconstruct_state,
ctx,
)
.await?;
Ok(res.pop_last().map(|(k, v)| {
assert_eq!(k, key);
v.unwrap()
}))
}
// test vectored get on parent timeline
assert_eq!(
get_vectored_impl_wrapper(&tline, base_key, lsn, &ctx).await?,
Some(test_img("metadata key 1"))
);
assert_eq!(
get_vectored_impl_wrapper(&tline, base_key_child, lsn, &ctx).await?,
None
);
assert_eq!(
get_vectored_impl_wrapper(&tline, base_key_nonexist, lsn, &ctx).await?,
None
);
// test vectored get on child timeline
assert_eq!(
get_vectored_impl_wrapper(&child, base_key, lsn, &ctx).await?,
None
);
assert_eq!(
get_vectored_impl_wrapper(&child, base_key_child, lsn, &ctx).await?,
Some(test_img("metadata key 2"))
);
assert_eq!(
get_vectored_impl_wrapper(&child, base_key_nonexist, lsn, &ctx).await?,
None
);
Ok(())
}
}

View File

@@ -113,12 +113,20 @@ impl From<VectoredValueReconstructState> for ValueReconstructState {
}
}
/// Bag of data accumulated during a vectored get
/// Bag of data accumulated during a vectored get..
pub(crate) struct ValuesReconstructState {
/// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
/// should not expect to get anything from this hashmap.
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
/// The keys which are already retrieved
keys_done: KeySpaceRandomAccum,
/// The keys covered by the image layers
keys_with_image_coverage: Option<Range<Key>>,
// Statistics that are still accessible as a caller of `get_vectored_impl`.
layers_visited: u32,
delta_layers_visited: u32,
}
impl ValuesReconstructState {
@@ -126,7 +134,9 @@ impl ValuesReconstructState {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
keys_with_image_coverage: None,
layers_visited: 0,
delta_layers_visited: 0,
}
}
@@ -140,8 +150,17 @@ impl ValuesReconstructState {
}
}
pub(crate) fn on_layer_visited(&mut self) {
pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
self.layers_visited += 1;
if let ReadableLayer::PersistentLayer(layer) = layer {
if layer.layer_desc().is_delta() {
self.delta_layers_visited += 1;
}
}
}
pub(crate) fn get_delta_layers_visited(&self) -> u32 {
self.delta_layers_visited
}
pub(crate) fn get_layers_visited(&self) -> u32 {
@@ -171,6 +190,16 @@ impl ValuesReconstructState {
}
}
/// On hitting image layer, we can mark all keys in this range as done, because
/// if the image layer does not contain a key, it is deleted/never added.
pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
let prev_val = self.keys_with_image_coverage.replace(key_range.clone());
assert_eq!(
prev_val, None,
"should consume the keyspace before the next iteration"
);
}
/// Update the state collected for a given key.
/// Returns true if this was the last value needed for the key and false otherwise.
///
@@ -233,8 +262,12 @@ impl ValuesReconstructState {
/// Returns the key space describing the keys that have
/// been marked as completed since the last call to this function.
pub(crate) fn consume_done_keys(&mut self) -> KeySpace {
self.keys_done.consume_keyspace()
/// Returns individual keys done, and the image layer coverage.
pub(crate) fn consume_done_keys(&mut self) -> (KeySpace, Option<Range<Key>>) {
(
self.keys_done.consume_keyspace(),
self.keys_with_image_coverage.take(),
)
}
}

View File

@@ -158,6 +158,7 @@ pub struct ImageLayerInner {
index_start_blk: u32,
index_root_blk: u32,
key_range: Range<Key>,
lsn: Lsn,
file: VirtualFile,
@@ -419,6 +420,7 @@ impl ImageLayerInner {
file,
file_id,
max_vectored_read_bytes,
key_range: actual_summary.key_range,
}))
}
@@ -478,6 +480,8 @@ impl ImageLayerInner {
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
.await;
reconstruct_state.on_image_layer_visited(&self.key_range);
Ok(())
}

View File

@@ -18,10 +18,10 @@ use fail::fail_point;
use once_cell::sync::Lazy;
use pageserver_api::{
key::{
AUX_FILES_KEY, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
NON_INHERITED_SPARSE_RANGE,
AUX_FILES_KEY, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE,
},
keyspace::{KeySpaceAccum, SparseKeyPartitioning},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
AtomicAuxFilePolicy, AuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
@@ -60,7 +60,6 @@ use std::{
ops::ControlFlow,
};
use crate::tenant::timeline::init::LocalLayerFileMetadata;
use crate::{
aux_file::AuxFileSizeEstimator,
tenant::{
@@ -89,6 +88,9 @@ use crate::{
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
};
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
use crate::{
pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::timeline::init::LocalLayerFileMetadata,
};
use crate::{
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
virtual_file::{MaybeFatalIo, VirtualFile},
@@ -346,8 +348,8 @@ pub struct Timeline {
// though let's keep them both for better error visibility.
pub initdb_lsn: Lsn,
/// When did we last calculate the partitioning?
partitioning: tokio::sync::Mutex<((KeyPartitioning, SparseKeyPartitioning), Lsn)>,
/// When did we last calculate the partitioning? Make it pub to test cases.
pub(super) partitioning: tokio::sync::Mutex<((KeyPartitioning, SparseKeyPartitioning), Lsn)>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
@@ -481,6 +483,11 @@ impl GcCutoffs {
}
}
pub(crate) struct TimelineVisitOutcome {
completed_keyspace: KeySpace,
image_covered_keyspace: KeySpace,
}
/// An error happened in a get() operation.
#[derive(thiserror::Error, Debug)]
pub(crate) enum PageReconstructError {
@@ -505,6 +512,13 @@ pub(crate) enum PageReconstructError {
MissingKey(MissingKeyError),
}
impl GetVectoredError {
#[cfg(test)]
pub(crate) fn is_missing_key_error(&self) -> bool {
matches!(self, Self::MissingKey(_))
}
}
#[derive(Debug)]
pub struct MissingKeyError {
key: Key,
@@ -782,6 +796,11 @@ pub(crate) enum ShutdownMode {
Hard,
}
struct ImageLayerCreationOutcome {
image: Option<ResidentLayer>,
next_start_key: Key,
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -883,7 +902,7 @@ impl Timeline {
}
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, reconstruct_state, ctx)
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await;
if self.conf.validate_vectored_get {
@@ -1028,7 +1047,12 @@ impl Timeline {
}
GetVectoredImpl::Vectored => {
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, ValuesReconstructState::new(), ctx)
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::new(),
ctx,
)
.await;
if self.conf.validate_vectored_get {
@@ -1116,7 +1140,7 @@ impl Timeline {
.get_vectored_impl(
keyspace.clone(),
lsn,
ValuesReconstructState::default(),
&mut ValuesReconstructState::default(),
ctx,
)
.await;
@@ -1193,7 +1217,7 @@ impl Timeline {
&self,
keyspace: KeySpace,
lsn: Lsn,
mut reconstruct_state: ValuesReconstructState,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let get_kind = if keyspace.total_raw_size() == 1 {
@@ -1205,7 +1229,7 @@ impl Timeline {
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
.for_get_kind(get_kind)
.start_timer();
self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
self.get_vectored_reconstruct_data(keyspace, lsn, reconstruct_state, ctx)
.await?;
get_data_timer.stop_and_record();
@@ -1214,7 +1238,8 @@ impl Timeline {
.start_timer();
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited();
for (key, res) in reconstruct_state.keys {
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
match res {
Err(err) => {
results.insert(key, Err(err));
@@ -3287,12 +3312,15 @@ impl Timeline {
let mut cont_lsn = Lsn(request_lsn.0 + 1);
loop {
let missing_keyspace = loop {
if self.cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
let completed = Self::get_vectored_reconstruct_data_timeline(
let TimelineVisitOutcome {
completed_keyspace: completed,
image_covered_keyspace,
} = Self::get_vectored_reconstruct_data_timeline(
timeline,
keyspace.clone(),
cont_lsn,
@@ -3311,12 +3339,31 @@ impl Timeline {
ranges: vec![NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE],
});
// Keyspace is fully retrieved, no ancestor timeline, or metadata scan (where we do not look
// into ancestor timelines). TODO: is there any other metadata which we want to inherit?
if keyspace.total_raw_size() == 0 || timeline.ancestor_timeline.is_none() {
break;
// Keyspace is fully retrieved
if keyspace.is_empty() {
break None;
}
// Not fully retrieved but no ancestor timeline.
if timeline.ancestor_timeline.is_none() {
break Some(keyspace);
}
// Now we see if there are keys covered by the image layer but does not exist in the
// image layer, which means that the key does not exist.
// The block below will stop the vectored search if any of the keys encountered an image layer
// which did not contain a snapshot for said key. Since we have already removed all completed
// keys from `keyspace`, we expect there to be no overlap between it and the image covered key
// space. If that's not the case, we had at least one key encounter a gap in the image layer
// and stop the search as a result of that.
let removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
if !removed.is_empty() {
break Some(removed);
}
// If we reached this point, `remove_overlapping_with` should not have made any change to the
// keyspace.
// Take the min to avoid reconstructing a page with data newer than request Lsn.
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
timeline_owned = timeline
@@ -3324,14 +3371,14 @@ impl Timeline {
.await
.map_err(GetVectoredError::GetReadyAncestorError)?;
timeline = &*timeline_owned;
}
};
if keyspace.total_raw_size() != 0 {
if let Some(missing_keyspace) = missing_keyspace {
return Err(GetVectoredError::MissingKey(MissingKeyError {
key: keyspace.start().unwrap(), /* better if we can store the full keyspace */
key: missing_keyspace.start().unwrap(), /* better if we can store the full keyspace */
shard: self
.shard_identity
.get_shard_number(&keyspace.start().unwrap()),
.get_shard_number(&missing_keyspace.start().unwrap()),
cont_lsn,
request_lsn,
ancestor_lsn: Some(timeline.ancestor_lsn),
@@ -3356,6 +3403,9 @@ impl Timeline {
///
/// At each iteration pop the top of the fringe (the layer with the highest Lsn)
/// and get all the required reconstruct data from the layer in one go.
///
/// Returns the completed keyspace and the keyspaces with image coverage. The caller
/// decides how to deal with these two keyspaces.
async fn get_vectored_reconstruct_data_timeline(
timeline: &Timeline,
keyspace: KeySpace,
@@ -3363,20 +3413,27 @@ impl Timeline {
reconstruct_state: &mut ValuesReconstructState,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<KeySpace, GetVectoredError> {
) -> Result<TimelineVisitOutcome, GetVectoredError> {
let mut unmapped_keyspace = keyspace.clone();
let mut fringe = LayerFringe::new();
let mut completed_keyspace = KeySpace::default();
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
loop {
if cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
let keys_done_last_step = reconstruct_state.consume_done_keys();
let (keys_done_last_step, keys_with_image_coverage) =
reconstruct_state.consume_done_keys();
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
completed_keyspace.merge(&keys_done_last_step);
if let Some(keys_with_image_coverage) = keys_with_image_coverage {
unmapped_keyspace
.remove_overlapping_with(&KeySpace::single(keys_with_image_coverage.clone()));
image_covered_keyspace.add_range(keys_with_image_coverage);
}
// Do not descent any further if the last layer we visited
// completed all keys in the keyspace it inspected. This is not
@@ -3448,13 +3505,16 @@ impl Timeline {
unmapped_keyspace = keyspace_to_read;
cont_lsn = next_cont_lsn;
reconstruct_state.on_layer_visited();
reconstruct_state.on_layer_visited(&layer_to_read);
} else {
break;
}
}
Ok(completed_keyspace)
Ok(TimelineVisitOutcome {
completed_keyspace,
image_covered_keyspace: image_covered_keyspace.consume_keyspace(),
})
}
/// # Cancel-safety
@@ -4134,6 +4194,176 @@ impl Timeline {
false
}
/// Create image layers for Postgres data. Assumes the caller passes a partition that is not too large,
/// so that at most one image layer will be produced from this function.
async fn create_image_layer_for_rel_blocks(
self: &Arc<Self>,
partition: &KeySpace,
mut image_layer_writer: ImageLayerWriter,
lsn: Lsn,
ctx: &RequestContext,
img_range: Range<Key>,
start: Key,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
let mut wrote_keys = false;
let mut key_request_accum = KeySpaceAccum::new();
for range in &partition.ranges {
let mut key = range.start;
while key < range.end {
// Decide whether to retain this key: usually we do, but sharded tenants may
// need to drop keys that don't belong to them. If we retain the key, add it
// to `key_request_accum` for later issuing a vectored get
if self.shard_identity.is_key_disposable(&key) {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
} else {
key_request_accum.add_key(key);
}
let last_key_in_range = key.next() == range.end;
key = key.next();
// Maybe flush `key_rest_accum`
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|| (last_key_in_range && key_request_accum.raw_size() > 0)
{
let results = self
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.await?;
for (img_key, img) in results {
let img = match img {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key) {
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(CreateImageLayersError::PageReconstructError(err));
}
}
};
// Write all the keys we just read into our new image layer.
image_layer_writer.put_image(img_key, img, ctx).await?;
wrote_keys = true;
}
}
}
}
if wrote_keys {
// Normal path: we have written some data into the new image layer for this
// partition, so flush it to disk.
let image_layer = image_layer_writer.finish(self, ctx).await?;
Ok(ImageLayerCreationOutcome {
image: Some(image_layer),
next_start_key: img_range.end,
})
} else {
// Special case: the image layer may be empty if this is a sharded tenant and the
// partition does not cover any keys owned by this shard. In this case, to ensure
// we don't leave gaps between image layers, leave `start` where it is, so that the next
// layer we write will cover the key range that we just scanned.
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
Ok(ImageLayerCreationOutcome {
image: None,
next_start_key: start,
})
}
}
/// Create an image layer for metadata keys. This function produces one image layer for all metadata
/// keys for now. Because metadata keys cannot exceed basebackup size limit, the image layer for it
/// would not be too large to fit in a single image layer.
#[allow(clippy::too_many_arguments)]
async fn create_image_layer_for_metadata_keys(
self: &Arc<Self>,
partition: &KeySpace,
mut image_layer_writer: ImageLayerWriter,
lsn: Lsn,
ctx: &RequestContext,
img_range: Range<Key>,
mode: ImageLayerCreationMode,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
assert!(!matches!(mode, ImageLayerCreationMode::Initial));
// Metadata keys image layer creation.
let mut reconstruct_state = ValuesReconstructState::default();
let data = self
.get_vectored_impl(partition.clone(), lsn, &mut reconstruct_state, ctx)
.await?;
let (data, total_kb_retrieved, total_key_retrieved) = {
let mut new_data = BTreeMap::new();
let mut total_kb_retrieved = 0;
let mut total_key_retrieved = 0;
for (k, v) in data {
let v = v.map_err(CreateImageLayersError::PageReconstructError)?;
total_kb_retrieved += KEY_SIZE + v.len();
total_key_retrieved += 1;
new_data.insert(k, v);
}
(new_data, total_kb_retrieved / 1024, total_key_retrieved)
};
let delta_file_accessed = reconstruct_state.get_delta_layers_visited();
let trigger_generation = delta_file_accessed as usize >= MAX_AUX_FILE_V2_DELTAS;
info!(
"generate image layers for metadata keys: trigger_generation={trigger_generation}, \
delta_file_accessed={delta_file_accessed}, total_kb_retrieved={total_kb_retrieved}, \
total_key_retrieved={total_key_retrieved}"
);
if !trigger_generation && mode == ImageLayerCreationMode::Try {
return Ok(ImageLayerCreationOutcome {
image: None,
next_start_key: img_range.end,
});
}
let has_keys = !data.is_empty();
for (k, v) in data {
// Even if the value is empty (deleted), we do not delete it for now until we can ensure vectored get
// considers this situation properly.
// if v.is_empty() {
// continue;
// }
// No need to handle sharding b/c metadata keys are always on the 0-th shard.
// TODO: split image layers to avoid too large layer files. Too large image files are not handled
// on the normal data path either.
image_layer_writer.put_image(k, v, ctx).await?;
}
Ok(ImageLayerCreationOutcome {
image: if has_keys {
let image_layer = image_layer_writer.finish(self, ctx).await?;
Some(image_layer)
} else {
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
None
},
next_start_key: img_range.end,
})
}
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
async fn create_image_layers(
self: &Arc<Timeline>,
@@ -4175,19 +4405,17 @@ impl Timeline {
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;
if partition.overlaps(&Key::metadata_key_range()) {
// TODO(chi): The next patch will correctly create image layers for metadata keys, and it would be a
// rather big change. Keep this patch small for now.
match mode {
ImageLayerCreationMode::Force | ImageLayerCreationMode::Try => {
// skip image layer creation anyways for metadata keys.
start = img_range.end;
continue;
}
ImageLayerCreationMode::Initial => {
return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers")));
}
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
if compact_metadata {
for range in &partition.ranges {
assert!(
range.start.field1 >= METADATA_KEY_BEGIN_PREFIX
&& range.end.field1 <= METADATA_KEY_END_PREFIX,
"metadata keys must be partitioned separately"
);
}
if mode == ImageLayerCreationMode::Initial {
return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers")));
}
} else if let ImageLayerCreationMode::Try = mode {
// check_for_image_layers = false -> skip
@@ -4198,7 +4426,7 @@ impl Timeline {
}
}
let mut image_layer_writer = ImageLayerWriter::new(
let image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
@@ -4214,87 +4442,39 @@ impl Timeline {
)))
});
let mut wrote_keys = false;
if !compact_metadata {
let ImageLayerCreationOutcome {
image,
next_start_key,
} = self
.create_image_layer_for_rel_blocks(
partition,
image_layer_writer,
lsn,
ctx,
img_range,
start,
)
.await?;
let mut key_request_accum = KeySpaceAccum::new();
for range in &partition.ranges {
let mut key = range.start;
while key < range.end {
// Decide whether to retain this key: usually we do, but sharded tenants may
// need to drop keys that don't belong to them. If we retain the key, add it
// to `key_request_accum` for later issuing a vectored get
if self.shard_identity.is_key_disposable(&key) {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
} else {
key_request_accum.add_key(key);
}
let last_key_in_range = key.next() == range.end;
key = key.next();
// Maybe flush `key_rest_accum`
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|| (last_key_in_range && key_request_accum.raw_size() > 0)
{
let results = self
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.await?;
for (img_key, img) in results {
let img = match img {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key)
{
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(CreateImageLayersError::PageReconstructError(
err,
));
}
}
};
// Write all the keys we just read into our new image layer.
image_layer_writer.put_image(img_key, img, ctx).await?;
wrote_keys = true;
}
}
}
}
if wrote_keys {
// Normal path: we have written some data into the new image layer for this
// partition, so flush it to disk.
start = img_range.end;
let image_layer = image_layer_writer.finish(self, ctx).await?;
image_layers.push(image_layer);
start = next_start_key;
image_layers.extend(image);
} else {
// Special case: the image layer may be empty if this is a sharded tenant and the
// partition does not cover any keys owned by this shard. In this case, to ensure
// we don't leave gaps between image layers, leave `start` where it is, so that the next
// layer we write will cover the key range that we just scanned.
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
let ImageLayerCreationOutcome {
image,
next_start_key,
} = self
.create_image_layer_for_metadata_keys(
partition,
image_layer_writer,
lsn,
ctx,
img_range,
mode,
)
.await?;
start = next_start_key;
image_layers.extend(image);
}
}

View File

@@ -116,9 +116,13 @@ impl Timeline {
// 3. Create new image layers for partitions that have been modified
// "enough".
let dense_layers = self
let mut partitioning = dense_partitioning;
partitioning
.parts
.extend(sparse_partitioning.into_dense().parts);
let image_layers = self
.create_image_layers(
&dense_partitioning,
&partitioning,
lsn,
if flags.contains(CompactFlags::ForceImageLayerCreation) {
ImageLayerCreationMode::Force
@@ -130,24 +134,8 @@ impl Timeline {
.await
.map_err(anyhow::Error::from)?;
// For now, nothing will be produced...
let sparse_layers = self
.create_image_layers(
&sparse_partitioning.clone().into_dense(),
lsn,
if flags.contains(CompactFlags::ForceImageLayerCreation) {
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
&image_ctx,
)
.await
.map_err(anyhow::Error::from)?;
assert!(sparse_layers.is_empty());
self.upload_new_image_layers(dense_layers)?;
dense_partitioning.parts.len()
self.upload_new_image_layers(image_layers)?;
partitioning.parts.len()
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
@@ -499,8 +487,11 @@ impl Timeline {
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
if let Some(prev_key) = prev {
// just first fast filter
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
// compaction is the gap between data key and metadata keys.
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
&& !Key::is_metadata_key(&prev_key)
{
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.

View File

@@ -20,6 +20,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use storage_broker::Uri;
use tokio::sync::mpsc;
use tracing::*;
use utils::pid_file;
@@ -29,13 +30,13 @@ use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::remove_wal;
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
use safekeeper::{control_file, BROKER_RUNTIME};
use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
@@ -376,6 +377,8 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
metrics::register_internal(Box::new(timeline_collector))?;
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
wal_backup::init_remote_storage(&conf);
// Keep handles to main tasks to die if any of them disappears.
@@ -388,9 +391,19 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));
let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));
// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone()).await?;
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;
let conf_ = conf.clone();
// Run everything in current thread rt, if asked.

View File

@@ -46,8 +46,6 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
return Ok(());
}
let active_timelines_set = GlobalTimelines::get_global_broker_active_set();
let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
@@ -59,9 +57,15 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let now = Instant::now();
let all_tlis = active_timelines_set.get_all();
let all_tlis = GlobalTimelines::get_all();
let mut n_pushed_tlis = 0;
for tli in &all_tlis {
// filtering alternative futures::stream::iter(all_tlis)
// .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
// doesn't look better, and I'm not sure how to do that without collect.
if !tli.is_active().await {
continue;
}
let sk_info = tli.get_safekeeper_info(&conf).await;
yield sk_info;
BROKER_PUSHED_UPDATES.inc();
@@ -86,7 +90,6 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
}
/// Subscribe and fetch all the interesting data from the broker.
#[instrument(name = "broker pull", skip_all)]
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;

View File

@@ -31,8 +31,6 @@ pub mod safekeeper;
pub mod send_wal;
pub mod state;
pub mod timeline;
pub mod timeline_manager;
pub mod timelines_set;
pub mod wal_backup;
pub mod wal_backup_partial;
pub mod wal_service;

View File

@@ -11,9 +11,8 @@ use futures::Future;
use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
proto::MetricFamily,
register_int_counter, register_int_counter_pair, register_int_counter_pair_vec,
register_int_counter_vec, Gauge, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec,
IntGaugeVec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge,
IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec,
};
use once_cell::sync::Lazy;
@@ -163,29 +162,6 @@ pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
});
pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_manager_iterations_total",
"Number of iterations of the timeline manager task"
)
.expect("Failed to register safekeeper_manager_iterations_total counter")
});
pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_manager_active_changes_total",
"Number of timeline active status changes in the timeline manager task"
)
.expect("Failed to register safekeeper_manager_active_changes_total counter")
});
pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
register_int_counter_pair!(
"safekeeper_wal_backup_tasks_started_total",
"Number of active WAL backup tasks",
"safekeeper_wal_backup_tasks_finished_total",
"Number of finished WAL backup tasks",
)
.expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
});
pub const LABEL_UNKNOWN: &str = "unknown";
@@ -638,7 +614,8 @@ impl Collector for TimelineCollector {
self.written_wal_seconds.reset();
self.flushed_wal_seconds.reset();
let timelines_count = GlobalTimelines::get_all().len();
let timelines = GlobalTimelines::get_all();
let timelines_count = timelines.len();
let mut active_timelines_count = 0;
// Prometheus Collector is sync, and data is stored under async lock. To
@@ -769,9 +746,9 @@ impl Collector for TimelineCollector {
async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
let mut res = vec![];
let active_timelines = GlobalTimelines::get_global_broker_active_set().get_all();
let timelines = GlobalTimelines::get_all();
for tli in active_timelines {
for tli in timelines {
if let Some(info) = tli.info_for_metrics().await {
res.push(info);
}

View File

@@ -45,9 +45,6 @@ const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
pub struct WalReceivers {
mutex: Mutex<WalReceiversShared>,
pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
num_computes_tx: tokio::sync::watch::Sender<usize>,
num_computes_rx: tokio::sync::watch::Receiver<usize>,
}
/// Id under which walreceiver is registered in shmem.
@@ -58,21 +55,16 @@ impl WalReceivers {
let (pageserver_feedback_tx, _) =
tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
let (num_computes_tx, num_computes_rx) = tokio::sync::watch::channel(0usize);
Arc::new(WalReceivers {
mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
pageserver_feedback_tx,
num_computes_tx,
num_computes_rx,
})
}
/// Register new walreceiver. Returned guard provides access to the slot and
/// automatically deregisters in Drop.
pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
let mut shared = self.mutex.lock();
let slots = &mut shared.slots;
let slots = &mut self.mutex.lock().slots;
let walreceiver = WalReceiverState {
conn_id,
status: WalReceiverStatus::Voting,
@@ -86,9 +78,6 @@ impl WalReceivers {
slots.push(Some(walreceiver));
pos
};
self.update_num(&shared);
WalReceiverGuard {
id: pos,
walreceivers: self.clone(),
@@ -110,18 +99,7 @@ impl WalReceivers {
/// Get number of walreceivers (compute connections).
pub fn get_num(self: &Arc<WalReceivers>) -> usize {
self.mutex.lock().get_num()
}
/// Get channel for number of walreceivers.
pub fn get_num_rx(self: &Arc<WalReceivers>) -> tokio::sync::watch::Receiver<usize> {
self.num_computes_rx.clone()
}
/// Should get called after every update of slots.
fn update_num(self: &Arc<WalReceivers>, shared: &MutexGuard<WalReceiversShared>) {
let num = shared.get_num();
self.num_computes_tx.send_replace(num);
self.mutex.lock().slots.iter().flatten().count()
}
/// Get state of all walreceivers.
@@ -145,7 +123,6 @@ impl WalReceivers {
fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
let mut shared = self.mutex.lock();
shared.slots[id] = None;
self.update_num(&shared);
}
/// Broadcast pageserver feedback to connected walproposers.
@@ -160,13 +137,6 @@ struct WalReceiversShared {
slots: Vec<Option<WalReceiverState>>,
}
impl WalReceiversShared {
/// Get number of walreceivers (compute connections).
fn get_num(&self) -> usize {
self.slots.iter().flatten().count()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalReceiverState {
/// None means it is recovery initiated by us (this safekeeper).
@@ -486,7 +456,14 @@ impl WalAcceptor {
/// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
/// it must mean that network thread terminated.
async fn run(&mut self) -> anyhow::Result<()> {
// Register the connection and defer unregister.
// Order of the next two lines is important: we want first to remove our entry and then
// update status which depends on registered connections.
let _compute_conn_guard = ComputeConnectionGuard {
timeline: Arc::clone(&self.tli),
};
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
self.tli.update_status_notify().await?;
// After this timestamp we will stop processing AppendRequests and send a response
// to the walproposer. walproposer sends at least one AppendRequest per second,
@@ -552,3 +529,19 @@ impl WalAcceptor {
}
}
}
/// Calls update_status_notify in drop to update timeline status.
struct ComputeConnectionGuard {
timeline: Arc<Timeline>,
}
impl Drop for ComputeConnectionGuard {
fn drop(&mut self) {
let tli = self.timeline.clone();
tokio::spawn(async move {
if let Err(e) = tli.update_status_notify().await {
error!("failed to update timeline status: {}", e);
}
});
}
}

View File

@@ -37,11 +37,17 @@ use crate::{
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: Arc<Timeline>, conf: SafeKeeperConf) {
info!("started");
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};
let cancel = tli.cancel.clone();
select! {
_ = recovery_main_loop(tli, conf) => { unreachable!() }
_ = cancel.cancelled() => {
_ = cancellation_rx.changed() => {
info!("stopped");
}
}

View File

@@ -7,18 +7,29 @@ use tracing::*;
use crate::{GlobalTimelines, SafeKeeperConf};
pub async fn task_main(_conf: SafeKeeperConf) -> anyhow::Result<()> {
const ALLOW_INACTIVE_TIMELINES: bool = true;
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
let wal_removal_interval = Duration::from_millis(5000);
loop {
let now = tokio::time::Instant::now();
let mut active_timelines = 0;
let tlis = GlobalTimelines::get_all();
for tli in &tlis {
let is_active = tli.is_active().await;
if is_active {
active_timelines += 1;
}
if !ALLOW_INACTIVE_TIMELINES && !is_active {
continue;
}
let ttid = tli.ttid;
async {
if let Err(e) = tli.maybe_persist_control_file().await {
warn!("failed to persist control file: {e}");
}
if let Err(e) = tli.remove_old_wal().await {
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled).await {
error!("failed to remove WAL: {}", e);
}
}
@@ -31,8 +42,8 @@ pub async fn task_main(_conf: SafeKeeperConf) -> anyhow::Result<()> {
if elapsed > wal_removal_interval {
info!(
"WAL removal is too long, processed {} timelines in {:?}",
total_timelines, elapsed
"WAL removal is too long, processed {} active timelines ({} total) in {:?}",
active_timelines, total_timelines, elapsed
);
}

View File

@@ -6,16 +6,15 @@ use camino::Utf8PathBuf;
use postgres_ffi::XLogSegNo;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio_util::sync::CancellationToken;
use utils::sync::gate::Gate;
use std::cmp::max;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::{sync::watch, time::Instant};
use tokio::sync::{Mutex, MutexGuard};
use tokio::{
sync::{mpsc::Sender, watch},
time::Instant,
};
use tracing::*;
use utils::http::error::ApiError;
use utils::{
@@ -34,13 +33,12 @@ use crate::safekeeper::{
};
use crate::send_wal::WalSenders;
use crate::state::{TimelineMemState, TimelinePersistentState};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self};
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::FullTimelineInfo;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::{debug_dump, timeline_manager, wal_backup_partial, wal_storage};
use crate::{debug_dump, wal_backup_partial, wal_storage};
use crate::{GlobalTimelines, SafeKeeperConf};
/// Things safekeeper should know about timeline state on peers.
@@ -53,7 +51,8 @@ pub struct PeerInfo {
/// LSN of the last record.
pub flush_lsn: Lsn,
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL.
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
/// sk since backup_lsn.
pub local_start_lsn: Lsn,
/// When info was received. Serde annotations are not very useful but make
/// the code compile -- we don't rely on this field externally.
@@ -98,72 +97,25 @@ impl PeersInfo {
}
}
pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
/// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
/// automatically updates `watch::Sender` channels with state on drop.
pub struct WriteGuardSharedState<'a> {
tli: Arc<Timeline>,
guard: RwLockWriteGuard<'a, SharedState>,
}
impl<'a> WriteGuardSharedState<'a> {
fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
WriteGuardSharedState { tli, guard }
}
}
impl<'a> Deref for WriteGuardSharedState<'a> {
type Target = SharedState;
fn deref(&self) -> &Self::Target {
&self.guard
}
}
impl<'a> DerefMut for WriteGuardSharedState<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.guard
}
}
impl<'a> Drop for WriteGuardSharedState<'a> {
fn drop(&mut self) {
let term_flush_lsn = TermLsn::from((self.guard.sk.get_term(), self.guard.sk.flush_lsn()));
let commit_lsn = self.guard.sk.state.inmem.commit_lsn;
let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
if *old != term_flush_lsn {
*old = term_flush_lsn;
true
} else {
false
}
});
let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
if *old != commit_lsn {
*old = commit_lsn;
true
} else {
false
}
});
// send notification about shared state update
self.tli.shared_state_version_tx.send_modify(|old| {
*old += 1;
});
}
}
/// Shared state associated with database instance
pub struct SharedState {
/// Safekeeper object
pub(crate) sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
/// In memory list containing state of peers sent in latest messages from them.
pub(crate) peers_info: PeersInfo,
pub(crate) last_removed_segno: XLogSegNo,
peers_info: PeersInfo,
/// True when WAL backup launcher oversees the timeline, making sure WAL is
/// offloaded, allows to bother launcher less.
wal_backup_active: bool,
/// True whenever there is at least some pending activity on timeline: live
/// compute connection, pageserver is not caughtup (it must have latest WAL
/// for new compute start) or WAL backuping is not finished. Practically it
/// means safekeepers broadcast info to peers about the timeline, old WAL is
/// trimmed.
///
/// TODO: it might be better to remove tli completely from GlobalTimelines
/// when tli is inactive instead of having this flag.
active: bool,
last_removed_segno: XLogSegNo,
}
impl SharedState {
@@ -200,6 +152,8 @@ impl SharedState {
Ok(Self {
sk,
peers_info: PeersInfo(vec![]),
wal_backup_active: false,
active: false,
last_removed_segno: 0,
})
}
@@ -217,10 +171,75 @@ impl SharedState {
Ok(Self {
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
peers_info: PeersInfo(vec![]),
wal_backup_active: false,
active: false,
last_removed_segno: 0,
})
}
fn is_active(&self, num_computes: usize) -> bool {
self.is_wal_backup_required(num_computes)
// FIXME: add tracking of relevant pageservers and check them here individually,
// otherwise migration won't work (we suspend too early).
|| self.sk.state.inmem.remote_consistent_lsn < self.sk.state.inmem.commit_lsn
}
/// Mark timeline active/inactive and return whether s3 offloading requires
/// start/stop action. If timeline is deactivated, control file is persisted
/// as maintenance task does that only for active timelines.
async fn update_status(&mut self, num_computes: usize, ttid: TenantTimelineId) -> bool {
let is_active = self.is_active(num_computes);
if self.active != is_active {
info!(
"timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}",
ttid,
is_active,
self.sk.state.inmem.remote_consistent_lsn,
self.sk.state.inmem.commit_lsn
);
if !is_active {
if let Err(e) = self.sk.state.flush().await {
warn!("control file save in update_status failed: {:?}", e);
}
}
}
self.active = is_active;
self.is_wal_backup_action_pending(num_computes)
}
/// Should we run s3 offloading in current state?
fn is_wal_backup_required(&self, num_computes: usize) -> bool {
let seg_size = self.get_wal_seg_size();
num_computes > 0 ||
// Currently only the whole segment is offloaded, so compare segment numbers.
(self.sk.state.inmem.commit_lsn.segment_number(seg_size) >
self.sk.state.inmem.backup_lsn.segment_number(seg_size))
}
/// Is current state of s3 offloading is not what it ought to be?
fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool {
let res = self.wal_backup_active != self.is_wal_backup_required(num_computes);
if res {
let action_pending = if self.is_wal_backup_required(num_computes) {
"start"
} else {
"stop"
};
trace!(
"timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
self.sk.state.timeline_id, action_pending, num_computes, self.sk.state.inmem.commit_lsn, self.sk.state.inmem.backup_lsn
);
}
res
}
/// Returns whether s3 offloading is required and sets current status as
/// matching.
fn wal_backup_attend(&mut self, num_computes: usize) -> bool {
self.wal_backup_active = self.is_wal_backup_required(num_computes);
self.wal_backup_active
}
fn get_wal_seg_size(&self) -> usize {
self.sk.state.server.wal_seg_size as usize
}
@@ -257,7 +276,7 @@ impl SharedState {
/// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty.
pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
let now = Instant::now();
self.peers_info
.0
@@ -273,13 +292,18 @@ impl SharedState {
/// offloading.
/// While it is safe to use inmem values for determining horizon,
/// we use persistent to make possible normal states less surprising.
fn get_horizon_segno(&self, extra_horizon_lsn: Option<Lsn>) -> XLogSegNo {
fn get_horizon_segno(
&self,
wal_backup_enabled: bool,
extra_horizon_lsn: Option<Lsn>,
) -> XLogSegNo {
let state = &self.sk.state;
use std::cmp::min;
let mut horizon_lsn = min(state.remote_consistent_lsn, state.peer_horizon_lsn);
// we don't want to remove WAL that is not yet offloaded to s3
horizon_lsn = min(horizon_lsn, state.backup_lsn);
if wal_backup_enabled {
horizon_lsn = min(horizon_lsn, state.backup_lsn);
}
if let Some(extra_horizon_lsn) = extra_horizon_lsn {
horizon_lsn = min(horizon_lsn, extra_horizon_lsn);
}
@@ -320,6 +344,11 @@ impl From<TimelineError> for ApiError {
pub struct Timeline {
pub ttid: TenantTimelineId,
/// Sending here asks for wal backup launcher attention (start/stop
/// offloading). Sending ttid instead of concrete command allows to do
/// sending without timeline lock.
pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
/// Used to broadcast commit_lsn updates to all background jobs.
commit_lsn_watch_tx: watch::Sender<Lsn>,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
@@ -331,22 +360,19 @@ pub struct Timeline {
term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
/// Broadcasts shared state updates.
shared_state_version_tx: watch::Sender<usize>,
shared_state_version_rx: watch::Receiver<usize>,
/// Safekeeper and other state, that should remain consistent and
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
/// while holding it, ensuring that consensus checks are in order.
mutex: RwLock<SharedState>,
mutex: Mutex<SharedState>,
walsenders: Arc<WalSenders>,
walreceivers: Arc<WalReceivers>,
/// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
pub(crate) cancel: CancellationToken,
/// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
cancellation_tx: watch::Sender<bool>,
/// Gate to be held by background tasks, blocks timeline deletion
pub(crate) gate: Gate,
/// Timeline should not be used after cancellation. Background tasks should
/// monitor this channel and stop eventually after receiving `true` from this channel.
cancellation_rx: watch::Receiver<bool>,
/// Directory where timeline state is stored.
pub timeline_dir: Utf8PathBuf,
@@ -356,15 +382,15 @@ pub struct Timeline {
/// with different speed.
// TODO: add `Arc<SafeKeeperConf>` here instead of adding each field separately.
walsenders_keep_horizon: bool,
// timeline_manager controlled state
pub(crate) broker_active: AtomicBool,
pub(crate) wal_backup_active: AtomicBool,
}
impl Timeline {
/// Load existing timeline from disk.
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Timeline> {
pub fn load_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<Timeline> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
let shared_state = SharedState::restore(conf, &ttid)?;
@@ -374,26 +400,23 @@ impl Timeline {
shared_state.sk.get_term(),
shared_state.sk.flush_lsn(),
)));
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
let (cancellation_tx, cancellation_rx) = watch::channel(false);
let walreceivers = WalReceivers::new();
Ok(Timeline {
ttid,
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
shared_state_version_tx,
shared_state_version_rx,
mutex: RwLock::new(shared_state),
mutex: Mutex::new(shared_state),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancel: CancellationToken::default(),
gate: Gate::default(),
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid),
walsenders_keep_horizon: conf.walsenders_keep_horizon,
broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false),
})
}
@@ -401,6 +424,7 @@ impl Timeline {
pub fn create_empty(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
server_info: ServerInfo,
commit_lsn: Lsn,
local_start_lsn: Lsn,
@@ -408,29 +432,25 @@ impl Timeline {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
let (cancellation_tx, cancellation_rx) = watch::channel(false);
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
let walreceivers = WalReceivers::new();
Ok(Timeline {
ttid,
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
shared_state_version_tx,
shared_state_version_rx,
mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?),
mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancel: CancellationToken::default(),
gate: Gate::default(),
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid),
walsenders_keep_horizon: conf.walsenders_keep_horizon,
broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false),
})
}
@@ -441,9 +461,8 @@ impl Timeline {
/// and state on disk should remain unchanged.
pub async fn init_new(
self: &Arc<Timeline>,
shared_state: &mut WriteGuardSharedState<'_>,
shared_state: &mut MutexGuard<'_, SharedState>,
conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
) -> Result<()> {
match fs::metadata(&self.timeline_dir).await {
Ok(_) => {
@@ -474,35 +493,16 @@ impl Timeline {
return Err(e);
}
self.bootstrap(conf, broker_active_set);
self.bootstrap(conf);
Ok(())
}
/// Bootstrap new or existing timeline starting background tasks.
pub fn bootstrap(
self: &Arc<Timeline>,
conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
) {
// Start manager task which will monitor timeline state and update
// background tasks.
let Ok(gate_guard) = self.gate.enter() else {
// We were already shut down
return;
};
tokio::spawn(timeline_manager::main_task(
self.clone(),
conf.clone(),
broker_active_set,
gate_guard,
));
/// Bootstrap new or existing timeline starting background stasks.
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
// Start recovery task which always runs on the timeline.
if conf.peer_recovery_enabled {
tokio::spawn(recovery_main(self.clone(), conf.clone()));
}
// TODO: migrate to timeline_manager
if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
tokio::spawn(wal_backup_partial::main_task(self.clone(), conf.clone()));
}
@@ -515,14 +515,12 @@ impl Timeline {
/// deletion API endpoint is retriable.
pub async fn delete(
&self,
shared_state: &mut WriteGuardSharedState<'_>,
shared_state: &mut MutexGuard<'_, SharedState>,
only_local: bool,
) -> Result<bool> {
) -> Result<(bool, bool)> {
let was_active = shared_state.active;
self.cancel(shared_state);
// Make sure any background tasks are gone before we start deleting things from storage
self.gate.close().await;
// TODO: It's better to wait for s3 offloader termination before
// removing data from s3. Though since s3 doesn't have transactions it
// still wouldn't guarantee absense of data after removal.
@@ -534,14 +532,20 @@ impl Timeline {
wal_backup::delete_timeline(&self.ttid).await?;
}
let dir_existed = delete_dir(&self.timeline_dir).await?;
Ok(dir_existed)
Ok((dir_existed, was_active))
}
/// Cancel timeline to prevent further usage. Background tasks will stop
/// eventually after receiving cancellation signal.
fn cancel(&self, shared_state: &mut WriteGuardSharedState<'_>) {
///
/// Note that we can't notify backup launcher here while holding
/// shared_state lock, as this is a potential deadlock: caller is
/// responsible for that. Generally we should probably make WAL backup tasks
/// to shut down on their own, checking once in a while whether it is the
/// time.
fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
info!("timeline {} is cancelled", self.ttid);
self.cancel.cancel();
let _ = self.cancellation_tx.send(true);
// Close associated FDs. Nobody will be able to touch timeline data once
// it is cancelled, so WAL storage won't be opened again.
shared_state.sk.wal_store.close();
@@ -549,16 +553,44 @@ impl Timeline {
/// Returns if timeline is cancelled.
pub fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()
*self.cancellation_rx.borrow()
}
/// Returns watch channel which gets value when timeline is cancelled. It is
/// guaranteed to have not cancelled value observed (errors otherwise).
pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
let rx = self.cancellation_rx.clone();
if *rx.borrow() {
bail!(TimelineError::Cancelled(self.ttid));
}
Ok(rx)
}
/// Take a writing mutual exclusive lock on timeline shared_state.
pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock().await
}
pub async fn read_shared_state(&self) -> ReadGuardSharedState {
self.mutex.read().await
async fn update_status(&self, shared_state: &mut SharedState) -> bool {
shared_state
.update_status(self.walreceivers.get_num(), self.ttid)
.await
}
/// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
pub async fn update_status_notify(&self) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let is_wal_backup_action_pending: bool = {
let mut shared_state = self.write_shared_state().await;
self.update_status(&mut shared_state).await
};
if is_wal_backup_action_pending {
// Can fail only if channel to a static thread got closed, which is not normal at all.
self.wal_backup_launcher_tx.send(self.ttid).await?;
}
Ok(())
}
/// Returns true if walsender should stop sending WAL to pageserver. We
@@ -570,7 +602,7 @@ impl Timeline {
if self.is_cancelled() {
return true;
}
let shared_state = self.read_shared_state().await;
let shared_state = self.write_shared_state().await;
if self.walreceivers.get_num() == 0 {
return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
@@ -578,9 +610,9 @@ impl Timeline {
false
}
/// Ensure that current term is t, erroring otherwise, and lock the state.
pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
let ss = self.read_shared_state().await;
/// Ensure taht current term is t, erroring otherwise, and lock the state.
pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
let ss = self.write_shared_state().await;
if ss.sk.state.acceptor_state.term != t {
bail!(
"failed to acquire term {}, current term {}",
@@ -591,6 +623,18 @@ impl Timeline {
Ok(ss)
}
/// Returns whether s3 offloading is required and sets current status as
/// matching it.
pub async fn wal_backup_attend(&self) -> bool {
if self.is_cancelled() {
return false;
}
self.write_shared_state()
.await
.wal_backup_attend(self.walreceivers.get_num())
}
/// Returns commit_lsn watch channel.
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
self.commit_lsn_watch_rx.clone()
@@ -601,14 +645,9 @@ impl Timeline {
self.term_flush_lsn_watch_rx.clone()
}
/// Returns watch channel for SharedState update version.
pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
self.shared_state_version_rx.clone()
}
/// Pass arrived message to the safekeeper.
pub async fn process_msg(
self: &Arc<Self>,
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
if self.is_cancelled() {
@@ -616,6 +655,8 @@ impl Timeline {
}
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
let term_flush_lsn: TermLsn;
{
let mut shared_state = self.write_shared_state().await;
rmsg = shared_state.sk.process_msg(msg).await?;
@@ -624,28 +665,43 @@ impl Timeline {
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
resp.hs_feedback = self.walsenders.get_hotstandby();
}
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
term_flush_lsn =
TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
}
self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
self.commit_lsn_watch_tx.send(commit_lsn)?;
Ok(rmsg)
}
/// Returns wal_seg_size.
pub async fn get_wal_seg_size(&self) -> usize {
self.read_shared_state().await.get_wal_seg_size()
self.write_shared_state().await.get_wal_seg_size()
}
/// Returns true only if the timeline is loaded and active.
pub async fn is_active(&self) -> bool {
if self.is_cancelled() {
return false;
}
self.write_shared_state().await.active
}
/// Returns state of the timeline.
pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
let state = self.read_shared_state().await;
let state = self.write_shared_state().await;
(state.sk.state.inmem.clone(), state.sk.state.clone())
}
/// Returns latest backup_lsn.
pub async fn get_wal_backup_lsn(&self) -> Lsn {
self.read_shared_state().await.sk.state.inmem.backup_lsn
self.write_shared_state().await.sk.state.inmem.backup_lsn
}
/// Sets backup_lsn to the given value.
pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
@@ -659,33 +715,39 @@ impl Timeline {
/// Get safekeeper info for broadcasting to broker and other peers.
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
let shared_state = self.read_shared_state().await;
let shared_state = self.write_shared_state().await;
shared_state.get_safekeeper_info(&self.ttid, conf)
}
/// Update timeline state with peer safekeeper data.
pub async fn record_safekeeper_info(
self: &Arc<Self>,
sk_info: SafekeeperTimelineInfo,
) -> Result<()> {
pub async fn record_safekeeper_info(&self, sk_info: SafekeeperTimelineInfo) -> Result<()> {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.write_shared_state().await;
shared_state.sk.record_safekeeper_info(&sk_info).await?;
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = self.update_status(&mut shared_state).await;
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
// Wake up wal backup launcher, if it is time to stop the offloading.
if is_wal_backup_action_pending {
self.wal_backup_launcher_tx.send(self.ttid).await?;
}
Ok(())
}
/// Update in memory remote consistent lsn.
pub async fn update_remote_consistent_lsn(self: &Arc<Self>, candidate: Lsn) {
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
let mut shared_state = self.write_shared_state().await;
shared_state.sk.state.inmem.remote_consistent_lsn =
max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
}
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.read_shared_state().await;
let shared_state = self.write_shared_state().await;
shared_state.get_peers(conf.heartbeat_timeout)
}
@@ -707,7 +769,7 @@ impl Timeline {
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
/// Thus we don't try to predict it here.
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
let ss = self.read_shared_state().await;
let ss = self.write_shared_state().await;
let term = ss.sk.state.acceptor_state.term;
let last_log_term = ss.sk.get_epoch();
let flush_lsn = ss.sk.flush_lsn();
@@ -778,12 +840,12 @@ impl Timeline {
/// Returns flush_lsn.
pub async fn get_flush_lsn(&self) -> Lsn {
self.read_shared_state().await.sk.wal_store.flush_lsn()
self.write_shared_state().await.sk.wal_store.flush_lsn()
}
/// Delete WAL segments from disk that are no longer needed. This is determined
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
pub async fn remove_old_wal(self: &Arc<Self>) -> Result<()> {
pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
@@ -799,8 +861,9 @@ impl Timeline {
let horizon_segno: XLogSegNo;
let remover = {
let shared_state = self.read_shared_state().await;
horizon_segno = shared_state.get_horizon_segno(replication_horizon_lsn);
let shared_state = self.write_shared_state().await;
horizon_segno =
shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn);
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
return Ok(()); // nothing to do
}
@@ -822,7 +885,7 @@ impl Timeline {
/// passed after the last save. This helps to keep remote_consistent_lsn up
/// to date so that storage nodes restart doesn't cause many pageserver ->
/// safekeeper reconnections.
pub async fn maybe_persist_control_file(self: &Arc<Self>) -> Result<()> {
pub async fn maybe_persist_control_file(&self) -> Result<()> {
self.write_shared_state()
.await
.sk
@@ -830,33 +893,38 @@ impl Timeline {
.await
}
/// Gather timeline data for metrics.
/// Gather timeline data for metrics. If the timeline is not active, returns
/// None, we do not collect these.
pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
if self.is_cancelled() {
return None;
}
let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
let state = self.read_shared_state().await;
Some(FullTimelineInfo {
ttid: self.ttid,
ps_feedback_count,
last_ps_feedback,
wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
timeline_is_active: self.broker_active.load(Ordering::Relaxed),
num_computes: self.walreceivers.get_num() as u32,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
mem_state: state.sk.state.inmem.clone(),
persisted_state: state.sk.state.clone(),
flush_lsn: state.sk.wal_store.flush_lsn(),
wal_storage: state.sk.wal_store.get_metrics(),
})
let state = self.write_shared_state().await;
if state.active {
Some(FullTimelineInfo {
ttid: self.ttid,
ps_feedback_count,
last_ps_feedback,
wal_backup_active: state.wal_backup_active,
timeline_is_active: state.active,
num_computes: self.walreceivers.get_num() as u32,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
mem_state: state.sk.state.inmem.clone(),
persisted_state: state.sk.state.clone(),
flush_lsn: state.sk.wal_store.flush_lsn(),
wal_storage: state.sk.wal_store.get_metrics(),
})
} else {
None
}
}
/// Returns in-memory timeline state to build a full debug dump.
pub async fn memory_dump(&self) -> debug_dump::Memory {
let state = self.read_shared_state().await;
let state = self.write_shared_state().await;
let (write_lsn, write_record_lsn, flush_lsn, file_open) =
state.sk.wal_store.internal_state();
@@ -865,8 +933,8 @@ impl Timeline {
is_cancelled: self.is_cancelled(),
peers_info_len: state.peers_info.0.len(),
walsenders: self.walsenders.get_all(),
wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
active: self.broker_active.load(Ordering::Relaxed),
wal_backup_active: state.wal_backup_active,
active: state.active,
num_computes: self.walreceivers.get_num() as u32,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
@@ -880,7 +948,7 @@ impl Timeline {
/// Apply a function to the control file state and persist it.
pub async fn map_control_file<T>(
self: &Arc<Self>,
&self,
f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
) -> Result<T> {
let mut state = self.write_shared_state().await;

View File

@@ -1,140 +0,0 @@
use std::{sync::Arc, time::Duration};
use tracing::{info, instrument, warn};
use utils::{lsn::Lsn, sync::gate::GateGuard};
use crate::{
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL},
timeline::{PeerInfo, ReadGuardSharedState, Timeline},
timelines_set::TimelinesSet,
wal_backup::{self, WalBackupTaskHandle},
SafeKeeperConf,
};
pub struct StateSnapshot {
pub commit_lsn: Lsn,
pub backup_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
pub peers: Vec<PeerInfo>,
}
impl StateSnapshot {
/// Create a new snapshot of the timeline state.
fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self {
Self {
commit_lsn: read_guard.sk.state.inmem.commit_lsn,
backup_lsn: read_guard.sk.state.inmem.backup_lsn,
remote_consistent_lsn: read_guard.sk.state.inmem.remote_consistent_lsn,
peers: read_guard.get_peers(heartbeat_timeout),
}
}
}
/// Control how often the manager task should wake up to check updates.
/// There is no need to check for updates more often than this.
const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
/// This task gets spawned alongside each timeline and is responsible for managing the timeline's
/// background tasks.
#[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))]
pub async fn main_task(
tli: Arc<Timeline>,
conf: SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
_gate_guard: GateGuard,
) {
scopeguard::defer! {
if tli.is_cancelled() {
info!("manager task finished");
} else {
warn!("manager task finished prematurely");
}
};
// sets whether timeline is active for broker pushes or not
let mut tli_broker_active = broker_active_set.guard(tli.clone());
let ttid = tli.ttid;
let wal_seg_size = tli.get_wal_seg_size().await;
let heartbeat_timeout = conf.heartbeat_timeout;
let mut state_version_rx = tli.get_state_version_rx();
let walreceivers = tli.get_walreceivers();
let mut num_computes_rx = walreceivers.get_num_rx();
// list of background tasks
let mut backup_task: Option<WalBackupTaskHandle> = None;
let last_state = 'outer: loop {
MANAGER_ITERATIONS_TOTAL.inc();
let state_snapshot = StateSnapshot::new(tli.read_shared_state().await, heartbeat_timeout);
let num_computes = *num_computes_rx.borrow();
let is_wal_backup_required =
wal_backup::is_wal_backup_required(wal_seg_size, num_computes, &state_snapshot);
if conf.is_wal_backup_enabled() {
wal_backup::update_task(
&conf,
ttid,
is_wal_backup_required,
&state_snapshot,
&mut backup_task,
)
.await;
}
let is_active = is_wal_backup_required
|| state_snapshot.remote_consistent_lsn < state_snapshot.commit_lsn;
// update the broker timeline set
if tli_broker_active.set(is_active) {
// write log if state has changed
info!(
"timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
is_active, state_snapshot.remote_consistent_lsn, state_snapshot.commit_lsn,
);
MANAGER_ACTIVE_CHANGES.inc();
if !is_active {
// TODO: maybe use tokio::spawn?
if let Err(e) = tli.maybe_persist_control_file().await {
warn!("control file save in update_status failed: {:?}", e);
}
}
}
// update the state in Arc<Timeline>
tli.wal_backup_active
.store(is_wal_backup_required, std::sync::atomic::Ordering::SeqCst);
tli.broker_active
.store(is_active, std::sync::atomic::Ordering::SeqCst);
// wait until something changes. tx channels are stored under Arc, so they will not be
// dropped until the manager task is finished.
tokio::select! {
_ = tli.cancel.cancelled() => {
// timeline was deleted
break 'outer state_snapshot;
}
_ = async {
// don't wake up on every state change, but at most every REFRESH_INTERVAL
tokio::time::sleep(REFRESH_INTERVAL).await;
let _ = state_version_rx.changed().await;
} => {
// state was updated
}
_ = num_computes_rx.changed() => {
// number of connected computes was updated
}
}
};
// shutdown background tasks
if conf.is_wal_backup_enabled() {
wal_backup::update_task(&conf, ttid, false, &last_state, &mut backup_task).await;
}
}

View File

@@ -4,7 +4,6 @@
use crate::safekeeper::ServerInfo;
use crate::timeline::{Timeline, TimelineError};
use crate::timelines_set::TimelinesSet;
use crate::SafeKeeperConf;
use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
@@ -12,16 +11,16 @@ use once_cell::sync::Lazy;
use serde::Serialize;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::Sender;
use tracing::*;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
wal_backup_launcher_tx: Option<Sender<TenantTimelineId>>,
conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
}
@@ -37,8 +36,11 @@ impl GlobalTimelinesState {
}
/// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>) {
(self.get_conf().clone(), self.broker_active_set.clone())
fn get_dependencies(&self) -> (SafeKeeperConf, Sender<TenantTimelineId>) {
(
self.get_conf().clone(),
self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
)
}
/// Insert timeline into the map. Returns error if timeline with the same id already exists.
@@ -63,8 +65,8 @@ impl GlobalTimelinesState {
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
wal_backup_launcher_tx: None,
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
})
});
@@ -74,11 +76,16 @@ pub struct GlobalTimelines;
impl GlobalTimelines {
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
pub async fn init(conf: SafeKeeperConf) -> Result<()> {
pub async fn init(
conf: SafeKeeperConf,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<()> {
// clippy isn't smart enough to understand that drop(state) releases the
// lock, so use explicit block
let tenants_dir = {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.wal_backup_launcher_tx.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
state.conf = Some(conf);
// Iterate through all directories and load tenants for all directories
@@ -122,9 +129,12 @@ impl GlobalTimelines {
/// this function is called during init when nothing else is running, so
/// this is fine.
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set) = {
let (conf, wal_backup_launcher_tx) = {
let state = TIMELINES_STATE.lock().unwrap();
state.get_dependencies()
(
state.get_conf().clone(),
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
)
};
let timelines_dir = conf.tenant_dir(&tenant_id);
@@ -137,7 +147,7 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(&conf, ttid) {
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
Ok(timeline) => {
let tli = Arc::new(timeline);
TIMELINES_STATE
@@ -145,7 +155,8 @@ impl GlobalTimelines {
.unwrap()
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(&conf, broker_active_set.clone());
tli.bootstrap(&conf);
tli.update_status_notify().await.unwrap();
}
// If we can't load a timeline, it's most likely because of a corrupted
// directory. We will log an error and won't allow to delete/recreate
@@ -178,9 +189,9 @@ impl GlobalTimelines {
_guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
ttid: TenantTimelineId,
) -> Result<Arc<Timeline>> {
let (conf, broker_active_set) = TIMELINES_STATE.lock().unwrap().get_dependencies();
let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
match Timeline::load_timeline(&conf, ttid) {
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
Ok(timeline) => {
let tli = Arc::new(timeline);
@@ -191,7 +202,7 @@ impl GlobalTimelines {
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(&conf, broker_active_set);
tli.bootstrap(&conf);
Ok(tli)
}
@@ -210,10 +221,6 @@ impl GlobalTimelines {
TIMELINES_STATE.lock().unwrap().get_conf().clone()
}
pub fn get_global_broker_active_set() -> Arc<TimelinesSet> {
TIMELINES_STATE.lock().unwrap().broker_active_set.clone()
}
/// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline.
pub async fn create(
@@ -222,7 +229,7 @@ impl GlobalTimelines {
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, broker_active_set) = {
let (conf, wal_backup_launcher_tx) = {
let state = TIMELINES_STATE.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
@@ -236,6 +243,7 @@ impl GlobalTimelines {
let timeline = Arc::new(Timeline::create_empty(
&conf,
ttid,
wal_backup_launcher_tx,
server_info,
commit_lsn,
local_start_lsn,
@@ -256,10 +264,7 @@ impl GlobalTimelines {
// Write the new timeline to the disk and start background workers.
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged.
if let Err(e) = timeline
.init_new(&mut shared_state, &conf, broker_active_set)
.await
{
if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
// Note: the most likely reason for init failure is that the timeline
// directory already exists on disk. This happens when timeline is corrupted
// and wasn't loaded from disk on startup because of that. We want to preserve
@@ -276,6 +281,8 @@ impl GlobalTimelines {
// We are done with bootstrap, release the lock, return the timeline.
// {} block forces release before .await
}
timeline.update_status_notify().await?;
timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
Ok(timeline)
}
@@ -328,13 +335,12 @@ impl GlobalTimelines {
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
match tli_res {
Ok(timeline) => {
let was_active = timeline.broker_active.load(Ordering::Relaxed);
// Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state().await;
info!("deleting timeline {}, only_local={}", ttid, only_local);
let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
let (dir_existed, was_active) =
timeline.delete(&mut shared_state, only_local).await?;
// Remove timeline from the map.
// FIXME: re-enable it once we fix the issue with recreation of deleted timelines
@@ -343,7 +349,7 @@ impl GlobalTimelines {
Ok(TimelineDeleteForceResult {
dir_existed,
was_active, // TODO: we probably should remove this field
was_active,
})
}
Err(_) => {

View File

@@ -1,90 +0,0 @@
use std::{collections::HashMap, sync::Arc};
use utils::id::TenantTimelineId;
use crate::timeline::Timeline;
/// Set of timelines, supports operations:
/// - add timeline
/// - remove timeline
/// - clone the set
///
/// Usually used for keeping subset of timelines. For example active timelines that require broker push.
pub struct TimelinesSet {
timelines: std::sync::Mutex<HashMap<TenantTimelineId, Arc<Timeline>>>,
}
impl Default for TimelinesSet {
fn default() -> Self {
Self {
timelines: std::sync::Mutex::new(HashMap::new()),
}
}
}
impl TimelinesSet {
pub fn insert(&self, tli: Arc<Timeline>) {
self.timelines.lock().unwrap().insert(tli.ttid, tli);
}
pub fn delete(&self, ttid: &TenantTimelineId) {
self.timelines.lock().unwrap().remove(ttid);
}
/// If present is true, adds timeline to the set, otherwise removes it.
pub fn set_present(&self, tli: Arc<Timeline>, present: bool) {
if present {
self.insert(tli);
} else {
self.delete(&tli.ttid);
}
}
pub fn is_present(&self, ttid: &TenantTimelineId) -> bool {
self.timelines.lock().unwrap().contains_key(ttid)
}
/// Returns all timelines in the set.
pub fn get_all(&self) -> Vec<Arc<Timeline>> {
self.timelines.lock().unwrap().values().cloned().collect()
}
/// Returns a timeline guard for easy presence control.
pub fn guard(self: &Arc<Self>, tli: Arc<Timeline>) -> TimelineSetGuard {
let is_present = self.is_present(&tli.ttid);
TimelineSetGuard {
timelines_set: self.clone(),
tli,
is_present,
}
}
}
/// Guard is used to add or remove timeline from the set.
/// If the timeline present in set, it will be removed from it on drop.
/// Note: do not use more than one guard for the same timeline, it caches the presence state.
/// It is designed to be used in the manager task only.
pub struct TimelineSetGuard {
timelines_set: Arc<TimelinesSet>,
tli: Arc<Timeline>,
is_present: bool,
}
impl TimelineSetGuard {
/// Returns true if the state was changed.
pub fn set(&mut self, present: bool) -> bool {
if present == self.is_present {
return false;
}
self.is_present = present;
self.timelines_set.set_present(self.tli.clone(), present);
true
}
}
impl Drop for TimelineSetGuard {
fn drop(&mut self) {
// remove timeline from the map on drop
self.timelines_set.delete(&self.tli.ttid);
}
}

View File

@@ -9,7 +9,7 @@ use utils::backoff;
use utils::id::NodeId;
use std::cmp::min;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroU32;
use std::pin::Pin;
use std::sync::Arc;
@@ -29,10 +29,9 @@ use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
use crate::timeline::{PeerInfo, Timeline};
use crate::timeline_manager::StateSnapshot;
use crate::{GlobalTimelines, SafeKeeperConf, WAL_BACKUP_RUNTIME};
use crate::{GlobalTimelines, SafeKeeperConf};
use once_cell::sync::OnceCell;
@@ -42,84 +41,35 @@ const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
/// Default buffer size when interfacing with [`tokio::fs::File`].
const BUFFER_SIZE: usize = 32 * 1024;
pub struct WalBackupTaskHandle {
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
/// aware of current status and return the timeline.
async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
match GlobalTimelines::get(ttid).ok() {
Some(tli) => {
tli.wal_backup_attend().await;
Some(tli)
}
None => None,
}
}
struct WalBackupTaskHandle {
shutdown_tx: Sender<()>,
handle: JoinHandle<()>,
}
/// Do we have anything to upload to S3, i.e. should safekeepers run backup activity?
pub fn is_wal_backup_required(
wal_seg_size: usize,
num_computes: usize,
state: &StateSnapshot,
) -> bool {
num_computes > 0 ||
// Currently only the whole segment is offloaded, so compare segment numbers.
(state.commit_lsn.segment_number(wal_seg_size) > state.backup_lsn.segment_number(wal_seg_size))
struct WalBackupTimelineEntry {
timeline: Arc<Timeline>,
handle: Option<WalBackupTaskHandle>,
}
/// Based on peer information determine which safekeeper should offload; if it
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
/// is running, kill it.
pub async fn update_task(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
need_backup: bool,
state: &StateSnapshot,
entry: &mut Option<WalBackupTaskHandle>,
) {
let (offloader, election_dbg_str) =
determine_offloader(&state.peers, state.backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
let should_task_run = need_backup && elected_me;
// start or stop the task
if should_task_run != (entry.is_some()) {
if should_task_run {
info!("elected for backup: {}", election_dbg_str);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let async_task = backup_task_main(
ttid,
timeline_dir,
conf.workdir.clone(),
conf.backup_parallel_jobs,
shutdown_rx,
);
let handle = if conf.current_thread_runtime {
tokio::spawn(async_task)
} else {
WAL_BACKUP_RUNTIME.spawn(async_task)
};
*entry = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
} else {
if !need_backup {
// don't need backup at all
info!("stepping down from backup, need_backup={}", need_backup);
} else {
// someone else has been elected
info!("stepping down from backup: {}", election_dbg_str);
}
shut_down_task(entry).await;
}
}
}
async fn shut_down_task(entry: &mut Option<WalBackupTaskHandle>) {
if let Some(wb_handle) = entry.take() {
async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
if let Some(wb_handle) = entry.handle.take() {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task panicked: {}", e);
warn!("WAL backup task for {} panicked: {}", ttid, e);
}
}
}
@@ -176,6 +126,49 @@ fn determine_offloader(
}
}
/// Based on peer information determine which safekeeper should offload; if it
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
/// is running, kill it.
async fn update_task(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
entry: &mut WalBackupTimelineEntry,
) {
let alive_peers = entry.timeline.get_peers(conf).await;
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
let (offloader, election_dbg_str) =
determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
if elected_me != (entry.handle.is_some()) {
if elected_me {
info!("elected for backup: {}", election_dbg_str);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(
ttid,
timeline_dir,
conf.workdir.clone(),
conf.backup_parallel_jobs,
shutdown_rx,
)
.in_current_span(),
);
entry.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
} else {
info!("stepping down from backup: {}", election_dbg_str);
shut_down_task(ttid, entry).await;
}
}
}
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
// Storage must be configured and initialized when this is called.
@@ -197,6 +190,67 @@ pub fn init_remote_storage(conf: &SafeKeeperConf) {
});
}
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
/// tasks. Having this in separate task simplifies locking, allows to reap
/// panics and separate elections from offloading itself.
pub async fn wal_backup_launcher_task_main(
conf: SafeKeeperConf,
mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
) -> anyhow::Result<()> {
info!(
"WAL backup launcher started, remote config {:?}",
conf.remote_storage
);
// Presence in this map means launcher is aware s3 offloading is needed for
// the timeline, but task is started only if it makes sense for to offload
// from this safekeeper.
let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
loop {
tokio::select! {
ttid = wal_backup_launcher_rx.recv() => {
// channel is never expected to get closed
let ttid = ttid.unwrap();
if !conf.is_wal_backup_enabled() {
continue; /* just drain the channel and do nothing */
}
async {
let timeline = is_wal_backup_required(ttid).await;
// do we need to do anything at all?
if timeline.is_some() != tasks.contains_key(&ttid) {
if let Some(timeline) = timeline {
// need to start the task
let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
timeline,
handle: None,
});
update_task(&conf, ttid, entry).await;
} else {
// need to stop the task
info!("stopping WAL backup task");
let mut entry = tasks.remove(&ttid).unwrap();
shut_down_task(ttid, &mut entry).await;
}
}
}.instrument(info_span!("WAL backup", ttid = %ttid)).await;
}
// For each timeline needing offloading, check if this safekeeper
// should do the job and start/stop the task accordingly.
_ = ticker.tick() => {
for (ttid, entry) in tasks.iter_mut() {
update_task(&conf, *ttid, entry)
.instrument(info_span!("WAL backup", ttid = %ttid))
.await;
}
}
}
}
}
struct WalBackupTask {
timeline: Arc<Timeline>,
timeline_dir: Utf8PathBuf,
@@ -207,7 +261,6 @@ struct WalBackupTask {
}
/// Offload single timeline.
#[instrument(name = "WAL backup", skip_all, fields(ttid = %ttid))]
async fn backup_task_main(
ttid: TenantTimelineId,
timeline_dir: Utf8PathBuf,
@@ -215,8 +268,6 @@ async fn backup_task_main(
parallel_jobs: usize,
mut shutdown_rx: Receiver<()>,
) {
let _guard = WAL_BACKUP_TASKS.guard();
info!("started");
let res = GlobalTimelines::get(ttid);
if let Err(e) = res {

View File

@@ -277,6 +277,14 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
debug!("started");
let await_duration = conf.partial_backup_timeout;
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};
// sleep for random time to avoid thundering herd
{
let randf64 = rand::thread_rng().gen_range(0.0..1.0);
@@ -319,7 +327,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
&& flush_lsn_rx.borrow().term == seg.term
{
tokio::select! {
_ = backup.tli.cancel.cancelled() => {
_ = cancellation_rx.changed() => {
info!("timeline canceled");
return;
}
@@ -332,7 +340,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
// if we don't have any data and zero LSNs, wait for something
while flush_lsn_rx.borrow().lsn == Lsn(0) {
tokio::select! {
_ = backup.tli.cancel.cancelled() => {
_ = cancellation_rx.changed() => {
info!("timeline canceled");
return;
}
@@ -349,7 +357,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
// waiting until timeout expires OR segno changes
'inner: loop {
tokio::select! {
_ = backup.tli.cancel.cancelled() => {
_ = cancellation_rx.changed() => {
info!("timeline canceled");
return;
}