Compare commits

..

12 Commits

Author SHA1 Message Date
John Spray
025a183091 safekeeper: use CancellationToken instead of watch channel 2024-05-21 17:51:43 +01:00
John Spray
baedfc90b5 safekeeper: use a Gate in Timeline to order shutdown 2024-05-21 17:38:04 +01:00
Arseny Sher
9519f06d48 Add context to broker pull. 2024-05-21 17:13:41 +03:00
Arseny Sher
6e17c359bd Fix logging when backup task is stopped. 2024-05-21 17:13:07 +03:00
Arthur Petukhovsky
78459aee43 Add manager comment 2024-05-20 12:58:01 +00:00
Arthur Petukhovsky
81dbfc33c2 Fix cargo doc 2024-05-20 12:52:56 +00:00
Arthur Petukhovsky
7771275cc6 Add metrics 2024-05-20 12:52:55 +00:00
Arthur Petukhovsky
86453b422d Add comments 2024-05-20 12:52:55 +00:00
Arthur Petukhovsky
35d6599278 Do self review 2024-05-20 12:52:55 +00:00
Arthur Petukhovsky
e8d0956cf7 Fix WAL_BACKUP_RUNTIME usage 2024-05-20 12:52:55 +00:00
Arthur Petukhovsky
053ada80b2 Implement TimelineSet for storing active timelines 2024-05-20 12:52:55 +00:00
Arthur Petukhovsky
75801f0451 Refactor backup launcher into timeline manager 2024-05-20 12:52:55 +00:00
24 changed files with 874 additions and 1446 deletions

View File

@@ -1,6 +1,4 @@
[![Neon](https://github.com/neondatabase/neon/assets/11527560/f15a17f0-836e-40c5-b35d-030606a6b660)](https://neon.tech)
[![Neon](https://user-images.githubusercontent.com/13738772/236813940-dcfdcb5b-69d3-449b-a686-013febe834d4.png)](https://neon.tech)
# Neon

View File

@@ -307,7 +307,7 @@ impl KeySpace {
}
/// Merge another keyspace into the current one.
/// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
/// Note: the keyspaces must not ovelap (enforced via assertions)
pub fn merge(&mut self, other: &KeySpace) {
let all_ranges = self
.ranges

View File

@@ -6,10 +6,14 @@ use futures::future::BoxFuture;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use pageserver_api::shard::ShardIdentity;
use pin_project_lite::pin_project;
use std::collections::BinaryHeap;
use std::collections::VecDeque;
use std::collections::{binary_heap, BinaryHeap};
use std::fmt::Display;
use std::ops::Range;
use std::future::Future;
use std::ops::{DerefMut, Range};
use std::pin::Pin;
use std::task::{ready, Poll};
use utils::lsn::Lsn;
pub const PAGE_SZ: u64 = 8192;
@@ -81,6 +85,33 @@ pub fn intersect_keyspace<K: Ord + Clone + Copy>(
ranges
}
/// Create a stream that iterates through all DeltaEntrys among all input
/// layers, in key-lsn order.
///
/// This is public because the create_delta() implementation likely wants to use this too
/// TODO: move to a more shared place
pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
layers: &'a [E::DeltaLayer],
ctx: &'a E::RequestContext,
) -> MergeDeltaKeys<'a, E> {
// Use a binary heap to merge the layers. Each input layer is initially
// represented by a LazyLoadLayer::Unloaded element, which uses the start of
// the layer's key range as the key. The first time a layer reaches the top
// of the heap, all the keys of the layer are loaded into a sorted vector.
//
// This helps to keep the memory usage reasonable: we only need to hold in
// memory the DeltaEntrys of the layers that overlap with the "current" key.
let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
for l in layers {
heap.push(LazyLoadLayer::Unloaded(l));
}
MergeDeltaKeys {
heap,
ctx,
load_future: None,
}
}
pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
layers: &'a [E::DeltaLayer],
ctx: &'a E::RequestContext,
@@ -98,139 +129,104 @@ pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
Ok(stream)
}
/// Wrapper type to make `dl.load_keys`` compile.
type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
pub enum LayerIterator<'a, E: CompactionJobExecutor> {
Loaded(
VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>,
&'a E::RequestContext,
),
Unloaded(&'a E::DeltaLayer, &'a E::RequestContext),
enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
Unloaded(&'a E::DeltaLayer),
}
impl<'a, E: CompactionJobExecutor + 'a> LayerIterator<'a, E> {
pub fn new(delta_layer: &'a E::DeltaLayer, ctx: &'a E::RequestContext) -> Self {
Self::Unloaded(delta_layer, ctx)
}
pub fn key_lsn(&self) -> (E::Key, Lsn) {
impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
fn min_key(&self) -> E::Key {
match self {
Self::Unloaded(dl, _) => (dl.key_range().start, dl.lsn_range().start),
Self::Loaded(entries, _) => entries.front().map(|x| (x.key(), x.lsn())).unwrap(),
Self::Loaded(entries) => entries.front().unwrap().key(),
Self::Unloaded(dl) => dl.key_range().start,
}
}
async fn load(&mut self) -> anyhow::Result<()> {
fn min_lsn(&self) -> Lsn {
match self {
Self::Unloaded(dl, ctx) => {
let unloaded_key_lsn = (dl.key_range().start, dl.lsn_range().start);
let fut: LoadFuture<
'a,
<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>,
> = Box::pin(dl.load_keys(ctx));
let keys = VecDeque::from(fut.await?);
assert_eq!(
keys.front().as_ref().map(|x| (x.key(), x.lsn())).unwrap(),
unloaded_key_lsn,
"unmatched start key_lsn"
);
*self = Self::Loaded(keys, ctx);
Ok(())
}
Self::Loaded(_, _) => Ok(()),
}
}
pub async fn entry(
&mut self,
) -> anyhow::Result<&<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
self.load().await?;
let Self::Loaded(x, _) = self else {
unreachable!()
};
Ok(x.front().unwrap())
}
pub async fn next(
&mut self,
) -> anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
self.load().await?; // requires Box::pin to make it compile
let Self::Loaded(x, _) = self else {
unreachable!()
};
Ok(x.pop_front().expect("already reached the end"))
}
pub fn is_end(&self) -> bool {
match self {
Self::Unloaded(_, _) => false,
Self::Loaded(x, _) => x.is_empty(),
Self::Loaded(entries) => entries.front().unwrap().lsn(),
Self::Unloaded(dl) => dl.lsn_range().start,
}
}
}
impl<'a, E: CompactionJobExecutor + 'a> PartialOrd for LayerIterator<'a, E> {
impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<'a, E: CompactionJobExecutor + 'a> Ord for LayerIterator<'a, E> {
impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// reverse comparison to get a min-heap
other.key_lsn().cmp(&self.key_lsn())
// reverse order so that we get a min-heap
(other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
}
}
impl<'a, E: CompactionJobExecutor + 'a> PartialEq for LayerIterator<'a, E> {
impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == std::cmp::Ordering::Equal
}
}
impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}
impl<'a, E: CompactionJobExecutor + 'a> Eq for LayerIterator<'a, E> {}
type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
pub struct DeltaMergeIterator<'a, E: CompactionJobExecutor> {
heap: BinaryHeap<LayerIterator<'a, E>>,
// Stream returned by `merge_delta_keys`
pin_project! {
#[allow(clippy::type_complexity)]
pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
heap: BinaryHeap<LazyLoadLayer<'a, E>>,
#[pin]
load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
ctx: &'a E::RequestContext,
}
}
impl<'a, E: CompactionJobExecutor + 'a> DeltaMergeIterator<'a, E> {
pub fn new(delta_layers: &'a [E::DeltaLayer], ctx: &'a E::RequestContext) -> Self {
let mut heap = BinaryHeap::new();
for dl in delta_layers {
heap.push(LayerIterator::new(dl, ctx));
}
Self { heap }
}
impl<'a, E> Stream for MergeDeltaKeys<'a, E>
where
E: CompactionJobExecutor + 'a,
{
type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
pub fn is_end(&self) -> bool {
self.heap.is_empty()
}
/// The next key-lsn entry that will be returned by `next`.
pub fn key_lsn(&self) -> (E::Key, Lsn) {
self.heap.peek().expect("already reached the end").key_lsn()
}
/// Move to the next entry and return the current entry.
pub async fn next(
&mut self,
) -> anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
let Some(mut top) = self.heap.peek_mut() else {
panic!("already reached the end")
};
match top.next().await {
Ok(entry) => {
if top.is_end() {
binary_heap::PeekMut::pop(top);
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
let mut this = self.project();
loop {
if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
// We are waiting for loading the keys to finish
match ready!(load_future.as_mut().poll(cx)) {
Ok(entries) => {
this.load_future.set(None);
*this.heap.peek_mut().unwrap() =
LazyLoadLayer::Loaded(VecDeque::from(entries));
}
Err(e) => {
return Poll::Ready(Some(Err(e)));
}
}
Ok(entry)
}
Err(e) => {
// pop the item if there is an error, otherwise it might cause further panic when binary heap compares it after `PeekMut` gets dropped.
binary_heap::PeekMut::pop(top);
Err(e)
// If the topmost layer in the heap hasn't been loaded yet, start
// loading it. Otherwise return the next entry from it and update
// the layer's position in the heap (this decreaseKey operation is
// performed implicitly when `top` is dropped).
if let Some(mut top) = this.heap.peek_mut() {
match top.deref_mut() {
LazyLoadLayer::Unloaded(ref mut l) => {
let fut = l.load_keys(this.ctx);
this.load_future.set(Some(Box::pin(fut)));
continue;
}
LazyLoadLayer::Loaded(ref mut entries) => {
let result = entries.pop_front().unwrap();
if entries.is_empty() {
std::collections::binary_heap::PeekMut::pop(top);
}
return Poll::Ready(Some(Ok(result)));
}
}
} else {
return Poll::Ready(None);
}
}
}

View File

@@ -92,9 +92,7 @@ pub trait CompactionJobExecutor {
) -> impl Future<Output = anyhow::Result<()>> + Send;
}
pub trait CompactionKey:
std::cmp::Ord + Clone + Copy + std::fmt::Display + std::fmt::Debug
{
pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display {
const MIN: Self;
const MAX: Self;

View File

@@ -2,6 +2,7 @@ mod draw;
use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};
use futures::StreamExt;
use pageserver_api::shard::ShardIdentity;
use rand::Rng;
use tracing::info;
@@ -14,8 +15,7 @@ use std::sync::Arc;
use std::sync::Mutex;
use crate::helpers::PAGE_SZ;
use crate::helpers::overlaps_with;
use crate::helpers::DeltaMergeIterator;
use crate::helpers::{merge_delta_keys, overlaps_with};
use crate::interface;
use crate::interface::CompactionLayer;
@@ -545,11 +545,12 @@ impl interface::CompactionJobExecutor for MockTimeline {
input_layers: &[Arc<MockDeltaLayer>],
ctx: &MockRequestContext,
) -> anyhow::Result<()> {
let mut key_value_stream = DeltaMergeIterator::<MockTimeline>::new(input_layers, ctx);
let mut key_value_stream =
std::pin::pin!(merge_delta_keys::<MockTimeline>(input_layers, ctx));
let mut records: Vec<MockRecord> = Vec::new();
let mut total_len = 2;
while !key_value_stream.is_end() {
let delta_entry: MockRecord = key_value_stream.next().await?;
while let Some(delta_entry) = key_value_stream.next().await {
let delta_entry: MockRecord = delta_entry?;
if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) {
total_len += delta_entry.len;
records.push(delta_entry);

View File

@@ -40,11 +40,7 @@ use utils::bin_ser::DeserializeError;
use utils::vec_map::{VecMap, VecMapOrdering};
use utils::{bin_ser::BeSer, lsn::Lsn};
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
/// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
const MAX_AUX_FILE_DELTAS: usize = 1024;
#[derive(Debug)]
pub enum LsnForTimestamp {

View File

@@ -3968,7 +3968,7 @@ mod tests {
use crate::tenant::harness::*;
use crate::tenant::timeline::CompactFlags;
use crate::DEFAULT_PG_VERSION;
use bytes::{Bytes, BytesMut};
use bytes::BytesMut;
use hex_literal::hex;
use pageserver_api::key::{AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::keyspace::KeySpace;
@@ -4777,12 +4777,7 @@ mod tests {
info!("Doing vectored read on {:?}", read);
let vectored_res = tline
.get_vectored_impl(
read.clone(),
reads_lsn,
&mut ValuesReconstructState::new(),
&ctx,
)
.get_vectored_impl(read.clone(), reads_lsn, ValuesReconstructState::new(), &ctx)
.await;
tline
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
@@ -4831,7 +4826,7 @@ mod tests {
.get_vectored_impl(
aux_keyspace.clone(),
read_lsn,
&mut ValuesReconstructState::new(),
ValuesReconstructState::new(),
&ctx,
)
.await;
@@ -4976,7 +4971,7 @@ mod tests {
.get_vectored_impl(
read.clone(),
current_lsn,
&mut ValuesReconstructState::new(),
ValuesReconstructState::new(),
&ctx,
)
.await?;
@@ -5111,7 +5106,7 @@ mod tests {
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
},
query_lsn,
&mut ValuesReconstructState::new(),
ValuesReconstructState::new(),
&ctx,
)
.await;
@@ -5552,7 +5547,7 @@ mod tests {
.await?;
const NUM_KEYS: usize = 1000;
const STEP: usize = 10000; // random update + scan base_key + idx * STEP
const STEP: usize = 100; // random update + scan base_key + idx * STEP
let cancel = CancellationToken::new();
@@ -5585,7 +5580,7 @@ mod tests {
let keyspace = KeySpace::single(base_key..base_key.add((NUM_KEYS * STEP) as u32));
for iter in 0..=10 {
for _ in 0..10 {
// Read all the blocks
for (blknum, last_lsn) in updated.iter().enumerate() {
test_key.field6 = (blknum * STEP) as u32;
@@ -5600,7 +5595,7 @@ mod tests {
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::default(),
ValuesReconstructState::default(),
&ctx,
)
.await?
@@ -5636,88 +5631,14 @@ mod tests {
updated[blknum] = lsn;
}
// Perform two cycles of flush, compact, and GC
for round in 0..2 {
tline.freeze_and_flush().await?;
tline
.compact(
&cancel,
if iter % 5 == 0 && round == 0 {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags
} else {
EnumSet::empty()
},
&ctx,
)
.await?;
tenant
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
}
}
Ok(())
}
#[tokio::test]
async fn test_metadata_compaction_trigger() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_compaction_trigger")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let cancel = CancellationToken::new();
let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
base_key.field1 = AUX_KEY_PREFIX;
let test_key = base_key;
let mut lsn = Lsn(0x10);
for _ in 0..20 {
lsn = Lsn(lsn.0 + 0x10);
let mut writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", 0, lsn))),
&ctx,
)
// Perform a cycle of flush, compact, and GC
tline.freeze_and_flush().await?;
tline.compact(&cancel, EnumSet::empty(), &ctx).await?;
tenant
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
writer.finish_write(lsn);
drop(writer);
tline.freeze_and_flush().await?; // force create a delta layer
}
let before_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()?
.len();
tline.compact(&cancel, EnumSet::empty(), &ctx).await?;
let after_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()?
.len();
assert!(after_num_l0_delta_files < before_num_l0_delta_files, "after_num_l0_delta_files={after_num_l0_delta_files}, before_num_l0_delta_files={before_num_l0_delta_files}");
assert_eq!(
tline.get(test_key, lsn, &ctx).await?,
test_img(&format!("{} at {}", 0, lsn))
);
Ok(())
}
@@ -5996,374 +5917,4 @@ mod tests {
Some(&bytes::Bytes::from_static(b"last"))
);
}
#[tokio::test]
async fn test_metadata_image_creation() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_image_creation")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
const NUM_KEYS: usize = 1000;
const STEP: usize = 10000; // random update + scan base_key + idx * STEP
let cancel = CancellationToken::new();
let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
base_key.field1 = AUX_KEY_PREFIX;
let mut test_key = base_key;
let mut lsn = Lsn(0x10);
async fn scan_with_statistics(
tline: &Timeline,
keyspace: &KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<(BTreeMap<Key, Result<Bytes, PageReconstructError>>, usize)> {
let mut reconstruct_state = ValuesReconstructState::default();
let res = tline
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await?;
Ok((res, reconstruct_state.get_delta_layers_visited() as usize))
}
#[allow(clippy::needless_range_loop)]
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = (blknum * STEP) as u32;
let mut writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
}
let keyspace = KeySpace::single(base_key..base_key.add((NUM_KEYS * STEP) as u32));
for iter in 1..=10 {
for _ in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = (blknum * STEP) as u32;
let mut writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
}
tline.freeze_and_flush().await?;
if iter % 5 == 0 {
let (_, before_delta_file_accessed) =
scan_with_statistics(&tline, &keyspace, lsn, &ctx).await?;
tline
.compact(
&cancel,
{
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags
},
&ctx,
)
.await?;
let (_, after_delta_file_accessed) =
scan_with_statistics(&tline, &keyspace, lsn, &ctx).await?;
assert!(after_delta_file_accessed < before_delta_file_accessed, "after_delta_file_accessed={after_delta_file_accessed}, before_delta_file_accessed={before_delta_file_accessed}");
// Given that we already produced an image layer, there should be no delta layer needed for the scan, but still setting a low threshold there for unforeseen circumstances.
assert!(
after_delta_file_accessed <= 2,
"after_delta_file_accessed={after_delta_file_accessed}"
);
}
}
Ok(())
}
#[tokio::test]
async fn test_vectored_missing_data_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_data_key_reads")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let cancel = CancellationToken::new();
let base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
let base_key_child = Key::from_hex("000000000033333333444444445500000001").unwrap();
let base_key_nonexist = Key::from_hex("000000000033333333444444445500000002").unwrap();
let mut lsn = Lsn(0x20);
{
let mut writer = tline.writer().await;
writer
.put(base_key, lsn, &Value::Image(test_img("data key 1")), &ctx)
.await?;
writer.finish_write(lsn);
drop(writer);
tline.freeze_and_flush().await?; // this will create a image layer
}
let child = tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(lsn), &ctx)
.await
.unwrap();
lsn.0 += 0x10;
{
let mut writer = child.writer().await;
writer
.put(
base_key_child,
lsn,
&Value::Image(test_img("data key 2")),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
child.freeze_and_flush().await?; // this will create a delta
{
// update the partitioning to include the test key space, otherwise they
// will be dropped by image layer creation
let mut guard = child.partitioning.lock().await;
let ((partitioning, _), partition_lsn) = &mut *guard;
partitioning
.parts
.push(KeySpace::single(base_key..base_key_nonexist)); // exclude the nonexist key
*partition_lsn = lsn;
}
child
.compact(
&cancel,
{
let mut set = EnumSet::empty();
set.insert(CompactFlags::ForceImageLayerCreation);
set
},
&ctx,
)
.await?; // force create an image layer for the keys, TODO: check if the image layer is created
}
async fn get_vectored_impl_wrapper(
tline: &Arc<Timeline>,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline
.get_vectored_impl(
KeySpace::single(key..key.next()),
lsn,
&mut reconstruct_state,
ctx,
)
.await?;
Ok(res.pop_last().map(|(k, v)| {
assert_eq!(k, key);
v.unwrap()
}))
}
// test vectored get on parent timeline
assert_eq!(
get_vectored_impl_wrapper(&tline, base_key, lsn, &ctx).await?,
Some(test_img("data key 1"))
);
assert!(get_vectored_impl_wrapper(&tline, base_key_child, lsn, &ctx)
.await
.unwrap_err()
.is_missing_key_error());
assert!(
get_vectored_impl_wrapper(&tline, base_key_nonexist, lsn, &ctx)
.await
.unwrap_err()
.is_missing_key_error()
);
// test vectored get on child timeline
assert_eq!(
get_vectored_impl_wrapper(&child, base_key, lsn, &ctx).await?,
Some(test_img("data key 1"))
);
assert_eq!(
get_vectored_impl_wrapper(&child, base_key_child, lsn, &ctx).await?,
Some(test_img("data key 2"))
);
assert!(
get_vectored_impl_wrapper(&child, base_key_nonexist, lsn, &ctx)
.await
.unwrap_err()
.is_missing_key_error()
);
Ok(())
}
#[tokio::test]
async fn test_vectored_missing_metadata_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let cancel = CancellationToken::new();
let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
let mut base_key_child = Key::from_hex("000000000033333333444444445500000001").unwrap();
let mut base_key_nonexist = Key::from_hex("000000000033333333444444445500000002").unwrap();
base_key.field1 = AUX_KEY_PREFIX;
base_key_child.field1 = AUX_KEY_PREFIX;
base_key_nonexist.field1 = AUX_KEY_PREFIX;
let mut lsn = Lsn(0x20);
{
let mut writer = tline.writer().await;
writer
.put(
base_key,
lsn,
&Value::Image(test_img("metadata key 1")),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
tline.freeze_and_flush().await?; // this will create an image layer
tline
.compact(
&cancel,
{
let mut set = EnumSet::empty();
set.insert(CompactFlags::ForceImageLayerCreation);
set.insert(CompactFlags::ForceRepartition);
set
},
&ctx,
)
.await?; // force create an image layer for metadata keys
tenant
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
}
let child = tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(lsn), &ctx)
.await
.unwrap();
lsn.0 += 0x10;
{
let mut writer = child.writer().await;
writer
.put(
base_key_child,
lsn,
&Value::Image(test_img("metadata key 2")),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
child.freeze_and_flush().await?;
child
.compact(
&cancel,
{
let mut set = EnumSet::empty();
set.insert(CompactFlags::ForceImageLayerCreation);
set.insert(CompactFlags::ForceRepartition);
set
},
&ctx,
)
.await?; // force create an image layer for metadata keys
tenant
.gc_iteration(Some(child.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
}
async fn get_vectored_impl_wrapper(
tline: &Arc<Timeline>,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline
.get_vectored_impl(
KeySpace::single(key..key.next()),
lsn,
&mut reconstruct_state,
ctx,
)
.await?;
Ok(res.pop_last().map(|(k, v)| {
assert_eq!(k, key);
v.unwrap()
}))
}
// test vectored get on parent timeline
assert_eq!(
get_vectored_impl_wrapper(&tline, base_key, lsn, &ctx).await?,
Some(test_img("metadata key 1"))
);
assert_eq!(
get_vectored_impl_wrapper(&tline, base_key_child, lsn, &ctx).await?,
None
);
assert_eq!(
get_vectored_impl_wrapper(&tline, base_key_nonexist, lsn, &ctx).await?,
None
);
// test vectored get on child timeline
assert_eq!(
get_vectored_impl_wrapper(&child, base_key, lsn, &ctx).await?,
None
);
assert_eq!(
get_vectored_impl_wrapper(&child, base_key_child, lsn, &ctx).await?,
Some(test_img("metadata key 2"))
);
assert_eq!(
get_vectored_impl_wrapper(&child, base_key_nonexist, lsn, &ctx).await?,
None
);
Ok(())
}
}

View File

@@ -113,20 +113,12 @@ impl From<VectoredValueReconstructState> for ValueReconstructState {
}
}
/// Bag of data accumulated during a vectored get..
/// Bag of data accumulated during a vectored get
pub(crate) struct ValuesReconstructState {
/// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
/// should not expect to get anything from this hashmap.
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
/// The keys which are already retrieved
keys_done: KeySpaceRandomAccum,
/// The keys covered by the image layers
keys_with_image_coverage: Option<Range<Key>>,
// Statistics that are still accessible as a caller of `get_vectored_impl`.
layers_visited: u32,
delta_layers_visited: u32,
}
impl ValuesReconstructState {
@@ -134,9 +126,7 @@ impl ValuesReconstructState {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
keys_with_image_coverage: None,
layers_visited: 0,
delta_layers_visited: 0,
}
}
@@ -150,17 +140,8 @@ impl ValuesReconstructState {
}
}
pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
pub(crate) fn on_layer_visited(&mut self) {
self.layers_visited += 1;
if let ReadableLayer::PersistentLayer(layer) = layer {
if layer.layer_desc().is_delta() {
self.delta_layers_visited += 1;
}
}
}
pub(crate) fn get_delta_layers_visited(&self) -> u32 {
self.delta_layers_visited
}
pub(crate) fn get_layers_visited(&self) -> u32 {
@@ -190,16 +171,6 @@ impl ValuesReconstructState {
}
}
/// On hitting image layer, we can mark all keys in this range as done, because
/// if the image layer does not contain a key, it is deleted/never added.
pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
let prev_val = self.keys_with_image_coverage.replace(key_range.clone());
assert_eq!(
prev_val, None,
"should consume the keyspace before the next iteration"
);
}
/// Update the state collected for a given key.
/// Returns true if this was the last value needed for the key and false otherwise.
///
@@ -262,12 +233,8 @@ impl ValuesReconstructState {
/// Returns the key space describing the keys that have
/// been marked as completed since the last call to this function.
/// Returns individual keys done, and the image layer coverage.
pub(crate) fn consume_done_keys(&mut self) -> (KeySpace, Option<Range<Key>>) {
(
self.keys_done.consume_keyspace(),
self.keys_with_image_coverage.take(),
)
pub(crate) fn consume_done_keys(&mut self) -> KeySpace {
self.keys_done.consume_keyspace()
}
}

View File

@@ -158,7 +158,6 @@ pub struct ImageLayerInner {
index_start_blk: u32,
index_root_blk: u32,
key_range: Range<Key>,
lsn: Lsn,
file: VirtualFile,
@@ -420,7 +419,6 @@ impl ImageLayerInner {
file,
file_id,
max_vectored_read_bytes,
key_range: actual_summary.key_range,
}))
}
@@ -480,8 +478,6 @@ impl ImageLayerInner {
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
.await;
reconstruct_state.on_image_layer_visited(&self.key_range);
Ok(())
}

View File

@@ -18,10 +18,10 @@ use fail::fail_point;
use once_cell::sync::Lazy;
use pageserver_api::{
key::{
AUX_FILES_KEY, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE,
AUX_FILES_KEY, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
NON_INHERITED_SPARSE_RANGE,
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
keyspace::{KeySpaceAccum, SparseKeyPartitioning},
models::{
AtomicAuxFilePolicy, AuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
@@ -60,6 +60,7 @@ use std::{
ops::ControlFlow,
};
use crate::tenant::timeline::init::LocalLayerFileMetadata;
use crate::{
aux_file::AuxFileSizeEstimator,
tenant::{
@@ -88,9 +89,6 @@ use crate::{
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
};
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
use crate::{
pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::timeline::init::LocalLayerFileMetadata,
};
use crate::{
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
virtual_file::{MaybeFatalIo, VirtualFile},
@@ -348,8 +346,8 @@ pub struct Timeline {
// though let's keep them both for better error visibility.
pub initdb_lsn: Lsn,
/// When did we last calculate the partitioning? Make it pub to test cases.
pub(super) partitioning: tokio::sync::Mutex<((KeyPartitioning, SparseKeyPartitioning), Lsn)>,
/// When did we last calculate the partitioning?
partitioning: tokio::sync::Mutex<((KeyPartitioning, SparseKeyPartitioning), Lsn)>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
@@ -483,11 +481,6 @@ impl GcCutoffs {
}
}
pub(crate) struct TimelineVisitOutcome {
completed_keyspace: KeySpace,
image_covered_keyspace: KeySpace,
}
/// An error happened in a get() operation.
#[derive(thiserror::Error, Debug)]
pub(crate) enum PageReconstructError {
@@ -512,13 +505,6 @@ pub(crate) enum PageReconstructError {
MissingKey(MissingKeyError),
}
impl GetVectoredError {
#[cfg(test)]
pub(crate) fn is_missing_key_error(&self) -> bool {
matches!(self, Self::MissingKey(_))
}
}
#[derive(Debug)]
pub struct MissingKeyError {
key: Key,
@@ -796,11 +782,6 @@ pub(crate) enum ShutdownMode {
Hard,
}
struct ImageLayerCreationOutcome {
image: Option<ResidentLayer>,
next_start_key: Key,
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -902,7 +883,7 @@ impl Timeline {
}
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.get_vectored_impl(keyspace.clone(), lsn, reconstruct_state, ctx)
.await;
if self.conf.validate_vectored_get {
@@ -1047,12 +1028,7 @@ impl Timeline {
}
GetVectoredImpl::Vectored => {
let vectored_res = self
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::new(),
ctx,
)
.get_vectored_impl(keyspace.clone(), lsn, ValuesReconstructState::new(), ctx)
.await;
if self.conf.validate_vectored_get {
@@ -1140,7 +1116,7 @@ impl Timeline {
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::default(),
ValuesReconstructState::default(),
ctx,
)
.await;
@@ -1217,7 +1193,7 @@ impl Timeline {
&self,
keyspace: KeySpace,
lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
mut reconstruct_state: ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let get_kind = if keyspace.total_raw_size() == 1 {
@@ -1229,7 +1205,7 @@ impl Timeline {
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
.for_get_kind(get_kind)
.start_timer();
self.get_vectored_reconstruct_data(keyspace, lsn, reconstruct_state, ctx)
self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
.await?;
get_data_timer.stop_and_record();
@@ -1238,8 +1214,7 @@ impl Timeline {
.start_timer();
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited();
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
for (key, res) in reconstruct_state.keys {
match res {
Err(err) => {
results.insert(key, Err(err));
@@ -3312,15 +3287,12 @@ impl Timeline {
let mut cont_lsn = Lsn(request_lsn.0 + 1);
let missing_keyspace = loop {
loop {
if self.cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
let TimelineVisitOutcome {
completed_keyspace: completed,
image_covered_keyspace,
} = Self::get_vectored_reconstruct_data_timeline(
let completed = Self::get_vectored_reconstruct_data_timeline(
timeline,
keyspace.clone(),
cont_lsn,
@@ -3339,31 +3311,12 @@ impl Timeline {
ranges: vec![NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE],
});
// Keyspace is fully retrieved
if keyspace.is_empty() {
break None;
// Keyspace is fully retrieved, no ancestor timeline, or metadata scan (where we do not look
// into ancestor timelines). TODO: is there any other metadata which we want to inherit?
if keyspace.total_raw_size() == 0 || timeline.ancestor_timeline.is_none() {
break;
}
// Not fully retrieved but no ancestor timeline.
if timeline.ancestor_timeline.is_none() {
break Some(keyspace);
}
// Now we see if there are keys covered by the image layer but does not exist in the
// image layer, which means that the key does not exist.
// The block below will stop the vectored search if any of the keys encountered an image layer
// which did not contain a snapshot for said key. Since we have already removed all completed
// keys from `keyspace`, we expect there to be no overlap between it and the image covered key
// space. If that's not the case, we had at least one key encounter a gap in the image layer
// and stop the search as a result of that.
let removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
if !removed.is_empty() {
break Some(removed);
}
// If we reached this point, `remove_overlapping_with` should not have made any change to the
// keyspace.
// Take the min to avoid reconstructing a page with data newer than request Lsn.
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
timeline_owned = timeline
@@ -3371,14 +3324,14 @@ impl Timeline {
.await
.map_err(GetVectoredError::GetReadyAncestorError)?;
timeline = &*timeline_owned;
};
}
if let Some(missing_keyspace) = missing_keyspace {
if keyspace.total_raw_size() != 0 {
return Err(GetVectoredError::MissingKey(MissingKeyError {
key: missing_keyspace.start().unwrap(), /* better if we can store the full keyspace */
key: keyspace.start().unwrap(), /* better if we can store the full keyspace */
shard: self
.shard_identity
.get_shard_number(&missing_keyspace.start().unwrap()),
.get_shard_number(&keyspace.start().unwrap()),
cont_lsn,
request_lsn,
ancestor_lsn: Some(timeline.ancestor_lsn),
@@ -3403,9 +3356,6 @@ impl Timeline {
///
/// At each iteration pop the top of the fringe (the layer with the highest Lsn)
/// and get all the required reconstruct data from the layer in one go.
///
/// Returns the completed keyspace and the keyspaces with image coverage. The caller
/// decides how to deal with these two keyspaces.
async fn get_vectored_reconstruct_data_timeline(
timeline: &Timeline,
keyspace: KeySpace,
@@ -3413,27 +3363,20 @@ impl Timeline {
reconstruct_state: &mut ValuesReconstructState,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<TimelineVisitOutcome, GetVectoredError> {
) -> Result<KeySpace, GetVectoredError> {
let mut unmapped_keyspace = keyspace.clone();
let mut fringe = LayerFringe::new();
let mut completed_keyspace = KeySpace::default();
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
loop {
if cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
let (keys_done_last_step, keys_with_image_coverage) =
reconstruct_state.consume_done_keys();
let keys_done_last_step = reconstruct_state.consume_done_keys();
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
completed_keyspace.merge(&keys_done_last_step);
if let Some(keys_with_image_coverage) = keys_with_image_coverage {
unmapped_keyspace
.remove_overlapping_with(&KeySpace::single(keys_with_image_coverage.clone()));
image_covered_keyspace.add_range(keys_with_image_coverage);
}
// Do not descent any further if the last layer we visited
// completed all keys in the keyspace it inspected. This is not
@@ -3505,16 +3448,13 @@ impl Timeline {
unmapped_keyspace = keyspace_to_read;
cont_lsn = next_cont_lsn;
reconstruct_state.on_layer_visited(&layer_to_read);
reconstruct_state.on_layer_visited();
} else {
break;
}
}
Ok(TimelineVisitOutcome {
completed_keyspace,
image_covered_keyspace: image_covered_keyspace.consume_keyspace(),
})
Ok(completed_keyspace)
}
/// # Cancel-safety
@@ -4194,176 +4134,6 @@ impl Timeline {
false
}
/// Create image layers for Postgres data. Assumes the caller passes a partition that is not too large,
/// so that at most one image layer will be produced from this function.
async fn create_image_layer_for_rel_blocks(
self: &Arc<Self>,
partition: &KeySpace,
mut image_layer_writer: ImageLayerWriter,
lsn: Lsn,
ctx: &RequestContext,
img_range: Range<Key>,
start: Key,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
let mut wrote_keys = false;
let mut key_request_accum = KeySpaceAccum::new();
for range in &partition.ranges {
let mut key = range.start;
while key < range.end {
// Decide whether to retain this key: usually we do, but sharded tenants may
// need to drop keys that don't belong to them. If we retain the key, add it
// to `key_request_accum` for later issuing a vectored get
if self.shard_identity.is_key_disposable(&key) {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
} else {
key_request_accum.add_key(key);
}
let last_key_in_range = key.next() == range.end;
key = key.next();
// Maybe flush `key_rest_accum`
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|| (last_key_in_range && key_request_accum.raw_size() > 0)
{
let results = self
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.await?;
for (img_key, img) in results {
let img = match img {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key) {
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(CreateImageLayersError::PageReconstructError(err));
}
}
};
// Write all the keys we just read into our new image layer.
image_layer_writer.put_image(img_key, img, ctx).await?;
wrote_keys = true;
}
}
}
}
if wrote_keys {
// Normal path: we have written some data into the new image layer for this
// partition, so flush it to disk.
let image_layer = image_layer_writer.finish(self, ctx).await?;
Ok(ImageLayerCreationOutcome {
image: Some(image_layer),
next_start_key: img_range.end,
})
} else {
// Special case: the image layer may be empty if this is a sharded tenant and the
// partition does not cover any keys owned by this shard. In this case, to ensure
// we don't leave gaps between image layers, leave `start` where it is, so that the next
// layer we write will cover the key range that we just scanned.
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
Ok(ImageLayerCreationOutcome {
image: None,
next_start_key: start,
})
}
}
/// Create an image layer for metadata keys. This function produces one image layer for all metadata
/// keys for now. Because metadata keys cannot exceed basebackup size limit, the image layer for it
/// would not be too large to fit in a single image layer.
#[allow(clippy::too_many_arguments)]
async fn create_image_layer_for_metadata_keys(
self: &Arc<Self>,
partition: &KeySpace,
mut image_layer_writer: ImageLayerWriter,
lsn: Lsn,
ctx: &RequestContext,
img_range: Range<Key>,
mode: ImageLayerCreationMode,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
assert!(!matches!(mode, ImageLayerCreationMode::Initial));
// Metadata keys image layer creation.
let mut reconstruct_state = ValuesReconstructState::default();
let data = self
.get_vectored_impl(partition.clone(), lsn, &mut reconstruct_state, ctx)
.await?;
let (data, total_kb_retrieved, total_key_retrieved) = {
let mut new_data = BTreeMap::new();
let mut total_kb_retrieved = 0;
let mut total_key_retrieved = 0;
for (k, v) in data {
let v = v.map_err(CreateImageLayersError::PageReconstructError)?;
total_kb_retrieved += KEY_SIZE + v.len();
total_key_retrieved += 1;
new_data.insert(k, v);
}
(new_data, total_kb_retrieved / 1024, total_key_retrieved)
};
let delta_file_accessed = reconstruct_state.get_delta_layers_visited();
let trigger_generation = delta_file_accessed as usize >= MAX_AUX_FILE_V2_DELTAS;
info!(
"generate image layers for metadata keys: trigger_generation={trigger_generation}, \
delta_file_accessed={delta_file_accessed}, total_kb_retrieved={total_kb_retrieved}, \
total_key_retrieved={total_key_retrieved}"
);
if !trigger_generation && mode == ImageLayerCreationMode::Try {
return Ok(ImageLayerCreationOutcome {
image: None,
next_start_key: img_range.end,
});
}
let has_keys = !data.is_empty();
for (k, v) in data {
// Even if the value is empty (deleted), we do not delete it for now until we can ensure vectored get
// considers this situation properly.
// if v.is_empty() {
// continue;
// }
// No need to handle sharding b/c metadata keys are always on the 0-th shard.
// TODO: split image layers to avoid too large layer files. Too large image files are not handled
// on the normal data path either.
image_layer_writer.put_image(k, v, ctx).await?;
}
Ok(ImageLayerCreationOutcome {
image: if has_keys {
let image_layer = image_layer_writer.finish(self, ctx).await?;
Some(image_layer)
} else {
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
None
},
next_start_key: img_range.end,
})
}
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
async fn create_image_layers(
self: &Arc<Timeline>,
@@ -4405,17 +4175,19 @@ impl Timeline {
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
if compact_metadata {
for range in &partition.ranges {
assert!(
range.start.field1 >= METADATA_KEY_BEGIN_PREFIX
&& range.end.field1 <= METADATA_KEY_END_PREFIX,
"metadata keys must be partitioned separately"
);
}
if mode == ImageLayerCreationMode::Initial {
return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers")));
if partition.overlaps(&Key::metadata_key_range()) {
// TODO(chi): The next patch will correctly create image layers for metadata keys, and it would be a
// rather big change. Keep this patch small for now.
match mode {
ImageLayerCreationMode::Force | ImageLayerCreationMode::Try => {
// skip image layer creation anyways for metadata keys.
start = img_range.end;
continue;
}
ImageLayerCreationMode::Initial => {
return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers")));
}
}
} else if let ImageLayerCreationMode::Try = mode {
// check_for_image_layers = false -> skip
@@ -4426,7 +4198,7 @@ impl Timeline {
}
}
let image_layer_writer = ImageLayerWriter::new(
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
@@ -4442,39 +4214,87 @@ impl Timeline {
)))
});
if !compact_metadata {
let ImageLayerCreationOutcome {
image,
next_start_key,
} = self
.create_image_layer_for_rel_blocks(
partition,
image_layer_writer,
lsn,
ctx,
img_range,
start,
)
.await?;
let mut wrote_keys = false;
start = next_start_key;
image_layers.extend(image);
let mut key_request_accum = KeySpaceAccum::new();
for range in &partition.ranges {
let mut key = range.start;
while key < range.end {
// Decide whether to retain this key: usually we do, but sharded tenants may
// need to drop keys that don't belong to them. If we retain the key, add it
// to `key_request_accum` for later issuing a vectored get
if self.shard_identity.is_key_disposable(&key) {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
} else {
key_request_accum.add_key(key);
}
let last_key_in_range = key.next() == range.end;
key = key.next();
// Maybe flush `key_rest_accum`
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|| (last_key_in_range && key_request_accum.raw_size() > 0)
{
let results = self
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.await?;
for (img_key, img) in results {
let img = match img {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key)
{
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(CreateImageLayersError::PageReconstructError(
err,
));
}
}
};
// Write all the keys we just read into our new image layer.
image_layer_writer.put_image(img_key, img, ctx).await?;
wrote_keys = true;
}
}
}
}
if wrote_keys {
// Normal path: we have written some data into the new image layer for this
// partition, so flush it to disk.
start = img_range.end;
let image_layer = image_layer_writer.finish(self, ctx).await?;
image_layers.push(image_layer);
} else {
let ImageLayerCreationOutcome {
image,
next_start_key,
} = self
.create_image_layer_for_metadata_keys(
partition,
image_layer_writer,
lsn,
ctx,
img_range,
mode,
)
.await?;
start = next_start_key;
image_layers.extend(image);
// Special case: the image layer may be empty if this is a sharded tenant and the
// partition does not cover any keys owned by this shard. In this case, to ensure
// we don't leave gaps between image layers, leave `start` where it is, so that the next
// layer we write will cover the key range that we just scanned.
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
}
}

View File

@@ -116,13 +116,9 @@ impl Timeline {
// 3. Create new image layers for partitions that have been modified
// "enough".
let mut partitioning = dense_partitioning;
partitioning
.parts
.extend(sparse_partitioning.into_dense().parts);
let image_layers = self
let dense_layers = self
.create_image_layers(
&partitioning,
&dense_partitioning,
lsn,
if flags.contains(CompactFlags::ForceImageLayerCreation) {
ImageLayerCreationMode::Force
@@ -134,8 +130,24 @@ impl Timeline {
.await
.map_err(anyhow::Error::from)?;
self.upload_new_image_layers(image_layers)?;
partitioning.parts.len()
// For now, nothing will be produced...
let sparse_layers = self
.create_image_layers(
&sparse_partitioning.clone().into_dense(),
lsn,
if flags.contains(CompactFlags::ForceImageLayerCreation) {
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
&image_ctx,
)
.await
.map_err(anyhow::Error::from)?;
assert!(sparse_layers.is_empty());
self.upload_new_image_layers(dense_layers)?;
dense_partitioning.parts.len()
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
@@ -487,11 +499,8 @@ impl Timeline {
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
if let Some(prev_key) = prev {
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
// compaction is the gap between data key and metadata keys.
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
&& !Key::is_metadata_key(&prev_key)
{
// just first fast filter
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.

View File

@@ -20,7 +20,6 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use storage_broker::Uri;
use tokio::sync::mpsc;
use tracing::*;
use utils::pid_file;
@@ -30,13 +29,13 @@ use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::remove_wal;
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
use safekeeper::{control_file, BROKER_RUNTIME};
use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
@@ -377,8 +376,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
metrics::register_internal(Box::new(timeline_collector))?;
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
wal_backup::init_remote_storage(&conf);
// Keep handles to main tasks to die if any of them disappears.
@@ -391,19 +388,9 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));
let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));
// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;
GlobalTimelines::init(conf.clone()).await?;
let conf_ = conf.clone();
// Run everything in current thread rt, if asked.

View File

@@ -46,6 +46,8 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
return Ok(());
}
let active_timelines_set = GlobalTimelines::get_global_broker_active_set();
let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
@@ -57,15 +59,9 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let now = Instant::now();
let all_tlis = GlobalTimelines::get_all();
let all_tlis = active_timelines_set.get_all();
let mut n_pushed_tlis = 0;
for tli in &all_tlis {
// filtering alternative futures::stream::iter(all_tlis)
// .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
// doesn't look better, and I'm not sure how to do that without collect.
if !tli.is_active().await {
continue;
}
let sk_info = tli.get_safekeeper_info(&conf).await;
yield sk_info;
BROKER_PUSHED_UPDATES.inc();
@@ -90,6 +86,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
}
/// Subscribe and fetch all the interesting data from the broker.
#[instrument(name = "broker pull", skip_all)]
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;

View File

@@ -31,6 +31,8 @@ pub mod safekeeper;
pub mod send_wal;
pub mod state;
pub mod timeline;
pub mod timeline_manager;
pub mod timelines_set;
pub mod wal_backup;
pub mod wal_backup_partial;
pub mod wal_service;

View File

@@ -11,8 +11,9 @@ use futures::Future;
use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
proto::MetricFamily,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge,
IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec,
register_int_counter, register_int_counter_pair, register_int_counter_pair_vec,
register_int_counter_vec, Gauge, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec,
IntGaugeVec,
};
use once_cell::sync::Lazy;
@@ -162,6 +163,29 @@ pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
});
pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_manager_iterations_total",
"Number of iterations of the timeline manager task"
)
.expect("Failed to register safekeeper_manager_iterations_total counter")
});
pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_manager_active_changes_total",
"Number of timeline active status changes in the timeline manager task"
)
.expect("Failed to register safekeeper_manager_active_changes_total counter")
});
pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
register_int_counter_pair!(
"safekeeper_wal_backup_tasks_started_total",
"Number of active WAL backup tasks",
"safekeeper_wal_backup_tasks_finished_total",
"Number of finished WAL backup tasks",
)
.expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
});
pub const LABEL_UNKNOWN: &str = "unknown";
@@ -614,8 +638,7 @@ impl Collector for TimelineCollector {
self.written_wal_seconds.reset();
self.flushed_wal_seconds.reset();
let timelines = GlobalTimelines::get_all();
let timelines_count = timelines.len();
let timelines_count = GlobalTimelines::get_all().len();
let mut active_timelines_count = 0;
// Prometheus Collector is sync, and data is stored under async lock. To
@@ -746,9 +769,9 @@ impl Collector for TimelineCollector {
async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
let mut res = vec![];
let timelines = GlobalTimelines::get_all();
let active_timelines = GlobalTimelines::get_global_broker_active_set().get_all();
for tli in timelines {
for tli in active_timelines {
if let Some(info) = tli.info_for_metrics().await {
res.push(info);
}

View File

@@ -45,6 +45,9 @@ const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
pub struct WalReceivers {
mutex: Mutex<WalReceiversShared>,
pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
num_computes_tx: tokio::sync::watch::Sender<usize>,
num_computes_rx: tokio::sync::watch::Receiver<usize>,
}
/// Id under which walreceiver is registered in shmem.
@@ -55,16 +58,21 @@ impl WalReceivers {
let (pageserver_feedback_tx, _) =
tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
let (num_computes_tx, num_computes_rx) = tokio::sync::watch::channel(0usize);
Arc::new(WalReceivers {
mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
pageserver_feedback_tx,
num_computes_tx,
num_computes_rx,
})
}
/// Register new walreceiver. Returned guard provides access to the slot and
/// automatically deregisters in Drop.
pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
let slots = &mut self.mutex.lock().slots;
let mut shared = self.mutex.lock();
let slots = &mut shared.slots;
let walreceiver = WalReceiverState {
conn_id,
status: WalReceiverStatus::Voting,
@@ -78,6 +86,9 @@ impl WalReceivers {
slots.push(Some(walreceiver));
pos
};
self.update_num(&shared);
WalReceiverGuard {
id: pos,
walreceivers: self.clone(),
@@ -99,7 +110,18 @@ impl WalReceivers {
/// Get number of walreceivers (compute connections).
pub fn get_num(self: &Arc<WalReceivers>) -> usize {
self.mutex.lock().slots.iter().flatten().count()
self.mutex.lock().get_num()
}
/// Get channel for number of walreceivers.
pub fn get_num_rx(self: &Arc<WalReceivers>) -> tokio::sync::watch::Receiver<usize> {
self.num_computes_rx.clone()
}
/// Should get called after every update of slots.
fn update_num(self: &Arc<WalReceivers>, shared: &MutexGuard<WalReceiversShared>) {
let num = shared.get_num();
self.num_computes_tx.send_replace(num);
}
/// Get state of all walreceivers.
@@ -123,6 +145,7 @@ impl WalReceivers {
fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
let mut shared = self.mutex.lock();
shared.slots[id] = None;
self.update_num(&shared);
}
/// Broadcast pageserver feedback to connected walproposers.
@@ -137,6 +160,13 @@ struct WalReceiversShared {
slots: Vec<Option<WalReceiverState>>,
}
impl WalReceiversShared {
/// Get number of walreceivers (compute connections).
fn get_num(&self) -> usize {
self.slots.iter().flatten().count()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalReceiverState {
/// None means it is recovery initiated by us (this safekeeper).
@@ -456,14 +486,7 @@ impl WalAcceptor {
/// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
/// it must mean that network thread terminated.
async fn run(&mut self) -> anyhow::Result<()> {
// Register the connection and defer unregister.
// Order of the next two lines is important: we want first to remove our entry and then
// update status which depends on registered connections.
let _compute_conn_guard = ComputeConnectionGuard {
timeline: Arc::clone(&self.tli),
};
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
self.tli.update_status_notify().await?;
// After this timestamp we will stop processing AppendRequests and send a response
// to the walproposer. walproposer sends at least one AppendRequest per second,
@@ -529,19 +552,3 @@ impl WalAcceptor {
}
}
}
/// Calls update_status_notify in drop to update timeline status.
struct ComputeConnectionGuard {
timeline: Arc<Timeline>,
}
impl Drop for ComputeConnectionGuard {
fn drop(&mut self) {
let tli = self.timeline.clone();
tokio::spawn(async move {
if let Err(e) = tli.update_status_notify().await {
error!("failed to update timeline status: {}", e);
}
});
}
}

View File

@@ -37,17 +37,11 @@ use crate::{
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: Arc<Timeline>, conf: SafeKeeperConf) {
info!("started");
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};
let cancel = tli.cancel.clone();
select! {
_ = recovery_main_loop(tli, conf) => { unreachable!() }
_ = cancellation_rx.changed() => {
_ = cancel.cancelled() => {
info!("stopped");
}
}

View File

@@ -7,29 +7,18 @@ use tracing::*;
use crate::{GlobalTimelines, SafeKeeperConf};
const ALLOW_INACTIVE_TIMELINES: bool = true;
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
pub async fn task_main(_conf: SafeKeeperConf) -> anyhow::Result<()> {
let wal_removal_interval = Duration::from_millis(5000);
loop {
let now = tokio::time::Instant::now();
let mut active_timelines = 0;
let tlis = GlobalTimelines::get_all();
for tli in &tlis {
let is_active = tli.is_active().await;
if is_active {
active_timelines += 1;
}
if !ALLOW_INACTIVE_TIMELINES && !is_active {
continue;
}
let ttid = tli.ttid;
async {
if let Err(e) = tli.maybe_persist_control_file().await {
warn!("failed to persist control file: {e}");
}
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled).await {
if let Err(e) = tli.remove_old_wal().await {
error!("failed to remove WAL: {}", e);
}
}
@@ -42,8 +31,8 @@ pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
if elapsed > wal_removal_interval {
info!(
"WAL removal is too long, processed {} active timelines ({} total) in {:?}",
active_timelines, total_timelines, elapsed
"WAL removal is too long, processed {} timelines in {:?}",
total_timelines, elapsed
);
}

View File

@@ -6,15 +6,16 @@ use camino::Utf8PathBuf;
use postgres_ffi::XLogSegNo;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio_util::sync::CancellationToken;
use utils::sync::gate::Gate;
use std::cmp::max;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, MutexGuard};
use tokio::{
sync::{mpsc::Sender, watch},
time::Instant,
};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::{sync::watch, time::Instant};
use tracing::*;
use utils::http::error::ApiError;
use utils::{
@@ -33,12 +34,13 @@ use crate::safekeeper::{
};
use crate::send_wal::WalSenders;
use crate::state::{TimelineMemState, TimelinePersistentState};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self};
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::FullTimelineInfo;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::{debug_dump, wal_backup_partial, wal_storage};
use crate::{debug_dump, timeline_manager, wal_backup_partial, wal_storage};
use crate::{GlobalTimelines, SafeKeeperConf};
/// Things safekeeper should know about timeline state on peers.
@@ -51,8 +53,7 @@ pub struct PeerInfo {
/// LSN of the last record.
pub flush_lsn: Lsn,
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
/// sk since backup_lsn.
/// Since which LSN safekeeper has WAL.
pub local_start_lsn: Lsn,
/// When info was received. Serde annotations are not very useful but make
/// the code compile -- we don't rely on this field externally.
@@ -97,25 +98,72 @@ impl PeersInfo {
}
}
pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
/// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
/// automatically updates `watch::Sender` channels with state on drop.
pub struct WriteGuardSharedState<'a> {
tli: Arc<Timeline>,
guard: RwLockWriteGuard<'a, SharedState>,
}
impl<'a> WriteGuardSharedState<'a> {
fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
WriteGuardSharedState { tli, guard }
}
}
impl<'a> Deref for WriteGuardSharedState<'a> {
type Target = SharedState;
fn deref(&self) -> &Self::Target {
&self.guard
}
}
impl<'a> DerefMut for WriteGuardSharedState<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.guard
}
}
impl<'a> Drop for WriteGuardSharedState<'a> {
fn drop(&mut self) {
let term_flush_lsn = TermLsn::from((self.guard.sk.get_term(), self.guard.sk.flush_lsn()));
let commit_lsn = self.guard.sk.state.inmem.commit_lsn;
let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
if *old != term_flush_lsn {
*old = term_flush_lsn;
true
} else {
false
}
});
let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
if *old != commit_lsn {
*old = commit_lsn;
true
} else {
false
}
});
// send notification about shared state update
self.tli.shared_state_version_tx.send_modify(|old| {
*old += 1;
});
}
}
/// Shared state associated with database instance
pub struct SharedState {
/// Safekeeper object
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
pub(crate) sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
/// In memory list containing state of peers sent in latest messages from them.
peers_info: PeersInfo,
/// True when WAL backup launcher oversees the timeline, making sure WAL is
/// offloaded, allows to bother launcher less.
wal_backup_active: bool,
/// True whenever there is at least some pending activity on timeline: live
/// compute connection, pageserver is not caughtup (it must have latest WAL
/// for new compute start) or WAL backuping is not finished. Practically it
/// means safekeepers broadcast info to peers about the timeline, old WAL is
/// trimmed.
///
/// TODO: it might be better to remove tli completely from GlobalTimelines
/// when tli is inactive instead of having this flag.
active: bool,
last_removed_segno: XLogSegNo,
pub(crate) peers_info: PeersInfo,
pub(crate) last_removed_segno: XLogSegNo,
}
impl SharedState {
@@ -152,8 +200,6 @@ impl SharedState {
Ok(Self {
sk,
peers_info: PeersInfo(vec![]),
wal_backup_active: false,
active: false,
last_removed_segno: 0,
})
}
@@ -171,75 +217,10 @@ impl SharedState {
Ok(Self {
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
peers_info: PeersInfo(vec![]),
wal_backup_active: false,
active: false,
last_removed_segno: 0,
})
}
fn is_active(&self, num_computes: usize) -> bool {
self.is_wal_backup_required(num_computes)
// FIXME: add tracking of relevant pageservers and check them here individually,
// otherwise migration won't work (we suspend too early).
|| self.sk.state.inmem.remote_consistent_lsn < self.sk.state.inmem.commit_lsn
}
/// Mark timeline active/inactive and return whether s3 offloading requires
/// start/stop action. If timeline is deactivated, control file is persisted
/// as maintenance task does that only for active timelines.
async fn update_status(&mut self, num_computes: usize, ttid: TenantTimelineId) -> bool {
let is_active = self.is_active(num_computes);
if self.active != is_active {
info!(
"timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}",
ttid,
is_active,
self.sk.state.inmem.remote_consistent_lsn,
self.sk.state.inmem.commit_lsn
);
if !is_active {
if let Err(e) = self.sk.state.flush().await {
warn!("control file save in update_status failed: {:?}", e);
}
}
}
self.active = is_active;
self.is_wal_backup_action_pending(num_computes)
}
/// Should we run s3 offloading in current state?
fn is_wal_backup_required(&self, num_computes: usize) -> bool {
let seg_size = self.get_wal_seg_size();
num_computes > 0 ||
// Currently only the whole segment is offloaded, so compare segment numbers.
(self.sk.state.inmem.commit_lsn.segment_number(seg_size) >
self.sk.state.inmem.backup_lsn.segment_number(seg_size))
}
/// Is current state of s3 offloading is not what it ought to be?
fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool {
let res = self.wal_backup_active != self.is_wal_backup_required(num_computes);
if res {
let action_pending = if self.is_wal_backup_required(num_computes) {
"start"
} else {
"stop"
};
trace!(
"timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
self.sk.state.timeline_id, action_pending, num_computes, self.sk.state.inmem.commit_lsn, self.sk.state.inmem.backup_lsn
);
}
res
}
/// Returns whether s3 offloading is required and sets current status as
/// matching.
fn wal_backup_attend(&mut self, num_computes: usize) -> bool {
self.wal_backup_active = self.is_wal_backup_required(num_computes);
self.wal_backup_active
}
fn get_wal_seg_size(&self) -> usize {
self.sk.state.server.wal_seg_size as usize
}
@@ -276,7 +257,7 @@ impl SharedState {
/// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty.
fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
let now = Instant::now();
self.peers_info
.0
@@ -292,18 +273,13 @@ impl SharedState {
/// offloading.
/// While it is safe to use inmem values for determining horizon,
/// we use persistent to make possible normal states less surprising.
fn get_horizon_segno(
&self,
wal_backup_enabled: bool,
extra_horizon_lsn: Option<Lsn>,
) -> XLogSegNo {
fn get_horizon_segno(&self, extra_horizon_lsn: Option<Lsn>) -> XLogSegNo {
let state = &self.sk.state;
use std::cmp::min;
let mut horizon_lsn = min(state.remote_consistent_lsn, state.peer_horizon_lsn);
if wal_backup_enabled {
horizon_lsn = min(horizon_lsn, state.backup_lsn);
}
// we don't want to remove WAL that is not yet offloaded to s3
horizon_lsn = min(horizon_lsn, state.backup_lsn);
if let Some(extra_horizon_lsn) = extra_horizon_lsn {
horizon_lsn = min(horizon_lsn, extra_horizon_lsn);
}
@@ -344,11 +320,6 @@ impl From<TimelineError> for ApiError {
pub struct Timeline {
pub ttid: TenantTimelineId,
/// Sending here asks for wal backup launcher attention (start/stop
/// offloading). Sending ttid instead of concrete command allows to do
/// sending without timeline lock.
pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
/// Used to broadcast commit_lsn updates to all background jobs.
commit_lsn_watch_tx: watch::Sender<Lsn>,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
@@ -360,19 +331,22 @@ pub struct Timeline {
term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
/// Broadcasts shared state updates.
shared_state_version_tx: watch::Sender<usize>,
shared_state_version_rx: watch::Receiver<usize>,
/// Safekeeper and other state, that should remain consistent and
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
/// while holding it, ensuring that consensus checks are in order.
mutex: Mutex<SharedState>,
mutex: RwLock<SharedState>,
walsenders: Arc<WalSenders>,
walreceivers: Arc<WalReceivers>,
/// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
cancellation_tx: watch::Sender<bool>,
/// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
pub(crate) cancel: CancellationToken,
/// Timeline should not be used after cancellation. Background tasks should
/// monitor this channel and stop eventually after receiving `true` from this channel.
cancellation_rx: watch::Receiver<bool>,
/// Gate to be held by background tasks, blocks timeline deletion
pub(crate) gate: Gate,
/// Directory where timeline state is stored.
pub timeline_dir: Utf8PathBuf,
@@ -382,15 +356,15 @@ pub struct Timeline {
/// with different speed.
// TODO: add `Arc<SafeKeeperConf>` here instead of adding each field separately.
walsenders_keep_horizon: bool,
// timeline_manager controlled state
pub(crate) broker_active: AtomicBool,
pub(crate) wal_backup_active: AtomicBool,
}
impl Timeline {
/// Load existing timeline from disk.
pub fn load_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<Timeline> {
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Timeline> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
let shared_state = SharedState::restore(conf, &ttid)?;
@@ -400,23 +374,26 @@ impl Timeline {
shared_state.sk.get_term(),
shared_state.sk.flush_lsn(),
)));
let (cancellation_tx, cancellation_rx) = watch::channel(false);
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
let walreceivers = WalReceivers::new();
Ok(Timeline {
ttid,
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
mutex: Mutex::new(shared_state),
shared_state_version_tx,
shared_state_version_rx,
mutex: RwLock::new(shared_state),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancellation_rx,
cancellation_tx,
cancel: CancellationToken::default(),
gate: Gate::default(),
timeline_dir: conf.timeline_dir(&ttid),
walsenders_keep_horizon: conf.walsenders_keep_horizon,
broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false),
})
}
@@ -424,7 +401,6 @@ impl Timeline {
pub fn create_empty(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
server_info: ServerInfo,
commit_lsn: Lsn,
local_start_lsn: Lsn,
@@ -432,25 +408,29 @@ impl Timeline {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
let (cancellation_tx, cancellation_rx) = watch::channel(false);
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
let walreceivers = WalReceivers::new();
Ok(Timeline {
ttid,
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
shared_state_version_tx,
shared_state_version_rx,
mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancellation_rx,
cancellation_tx,
cancel: CancellationToken::default(),
gate: Gate::default(),
timeline_dir: conf.timeline_dir(&ttid),
walsenders_keep_horizon: conf.walsenders_keep_horizon,
broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false),
})
}
@@ -461,8 +441,9 @@ impl Timeline {
/// and state on disk should remain unchanged.
pub async fn init_new(
self: &Arc<Timeline>,
shared_state: &mut MutexGuard<'_, SharedState>,
shared_state: &mut WriteGuardSharedState<'_>,
conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
) -> Result<()> {
match fs::metadata(&self.timeline_dir).await {
Ok(_) => {
@@ -493,16 +474,35 @@ impl Timeline {
return Err(e);
}
self.bootstrap(conf);
self.bootstrap(conf, broker_active_set);
Ok(())
}
/// Bootstrap new or existing timeline starting background stasks.
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
/// Bootstrap new or existing timeline starting background tasks.
pub fn bootstrap(
self: &Arc<Timeline>,
conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
) {
// Start manager task which will monitor timeline state and update
// background tasks.
let Ok(gate_guard) = self.gate.enter() else {
// We were already shut down
return;
};
tokio::spawn(timeline_manager::main_task(
self.clone(),
conf.clone(),
broker_active_set,
gate_guard,
));
// Start recovery task which always runs on the timeline.
if conf.peer_recovery_enabled {
tokio::spawn(recovery_main(self.clone(), conf.clone()));
}
// TODO: migrate to timeline_manager
if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
tokio::spawn(wal_backup_partial::main_task(self.clone(), conf.clone()));
}
@@ -515,12 +515,14 @@ impl Timeline {
/// deletion API endpoint is retriable.
pub async fn delete(
&self,
shared_state: &mut MutexGuard<'_, SharedState>,
shared_state: &mut WriteGuardSharedState<'_>,
only_local: bool,
) -> Result<(bool, bool)> {
let was_active = shared_state.active;
) -> Result<bool> {
self.cancel(shared_state);
// Make sure any background tasks are gone before we start deleting things from storage
self.gate.close().await;
// TODO: It's better to wait for s3 offloader termination before
// removing data from s3. Though since s3 doesn't have transactions it
// still wouldn't guarantee absense of data after removal.
@@ -532,20 +534,14 @@ impl Timeline {
wal_backup::delete_timeline(&self.ttid).await?;
}
let dir_existed = delete_dir(&self.timeline_dir).await?;
Ok((dir_existed, was_active))
Ok(dir_existed)
}
/// Cancel timeline to prevent further usage. Background tasks will stop
/// eventually after receiving cancellation signal.
///
/// Note that we can't notify backup launcher here while holding
/// shared_state lock, as this is a potential deadlock: caller is
/// responsible for that. Generally we should probably make WAL backup tasks
/// to shut down on their own, checking once in a while whether it is the
/// time.
fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
fn cancel(&self, shared_state: &mut WriteGuardSharedState<'_>) {
info!("timeline {} is cancelled", self.ttid);
let _ = self.cancellation_tx.send(true);
self.cancel.cancel();
// Close associated FDs. Nobody will be able to touch timeline data once
// it is cancelled, so WAL storage won't be opened again.
shared_state.sk.wal_store.close();
@@ -553,44 +549,16 @@ impl Timeline {
/// Returns if timeline is cancelled.
pub fn is_cancelled(&self) -> bool {
*self.cancellation_rx.borrow()
}
/// Returns watch channel which gets value when timeline is cancelled. It is
/// guaranteed to have not cancelled value observed (errors otherwise).
pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
let rx = self.cancellation_rx.clone();
if *rx.borrow() {
bail!(TimelineError::Cancelled(self.ttid));
}
Ok(rx)
self.cancel.is_cancelled()
}
/// Take a writing mutual exclusive lock on timeline shared_state.
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock().await
pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
}
async fn update_status(&self, shared_state: &mut SharedState) -> bool {
shared_state
.update_status(self.walreceivers.get_num(), self.ttid)
.await
}
/// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
pub async fn update_status_notify(&self) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let is_wal_backup_action_pending: bool = {
let mut shared_state = self.write_shared_state().await;
self.update_status(&mut shared_state).await
};
if is_wal_backup_action_pending {
// Can fail only if channel to a static thread got closed, which is not normal at all.
self.wal_backup_launcher_tx.send(self.ttid).await?;
}
Ok(())
pub async fn read_shared_state(&self) -> ReadGuardSharedState {
self.mutex.read().await
}
/// Returns true if walsender should stop sending WAL to pageserver. We
@@ -602,7 +570,7 @@ impl Timeline {
if self.is_cancelled() {
return true;
}
let shared_state = self.write_shared_state().await;
let shared_state = self.read_shared_state().await;
if self.walreceivers.get_num() == 0 {
return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
@@ -610,9 +578,9 @@ impl Timeline {
false
}
/// Ensure taht current term is t, erroring otherwise, and lock the state.
pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
let ss = self.write_shared_state().await;
/// Ensure that current term is t, erroring otherwise, and lock the state.
pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
let ss = self.read_shared_state().await;
if ss.sk.state.acceptor_state.term != t {
bail!(
"failed to acquire term {}, current term {}",
@@ -623,18 +591,6 @@ impl Timeline {
Ok(ss)
}
/// Returns whether s3 offloading is required and sets current status as
/// matching it.
pub async fn wal_backup_attend(&self) -> bool {
if self.is_cancelled() {
return false;
}
self.write_shared_state()
.await
.wal_backup_attend(self.walreceivers.get_num())
}
/// Returns commit_lsn watch channel.
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
self.commit_lsn_watch_rx.clone()
@@ -645,9 +601,14 @@ impl Timeline {
self.term_flush_lsn_watch_rx.clone()
}
/// Returns watch channel for SharedState update version.
pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
self.shared_state_version_rx.clone()
}
/// Pass arrived message to the safekeeper.
pub async fn process_msg(
&self,
self: &Arc<Self>,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
if self.is_cancelled() {
@@ -655,8 +616,6 @@ impl Timeline {
}
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
let term_flush_lsn: TermLsn;
{
let mut shared_state = self.write_shared_state().await;
rmsg = shared_state.sk.process_msg(msg).await?;
@@ -665,43 +624,28 @@ impl Timeline {
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
resp.hs_feedback = self.walsenders.get_hotstandby();
}
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
term_flush_lsn =
TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
}
self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
self.commit_lsn_watch_tx.send(commit_lsn)?;
Ok(rmsg)
}
/// Returns wal_seg_size.
pub async fn get_wal_seg_size(&self) -> usize {
self.write_shared_state().await.get_wal_seg_size()
}
/// Returns true only if the timeline is loaded and active.
pub async fn is_active(&self) -> bool {
if self.is_cancelled() {
return false;
}
self.write_shared_state().await.active
self.read_shared_state().await.get_wal_seg_size()
}
/// Returns state of the timeline.
pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
let state = self.write_shared_state().await;
let state = self.read_shared_state().await;
(state.sk.state.inmem.clone(), state.sk.state.clone())
}
/// Returns latest backup_lsn.
pub async fn get_wal_backup_lsn(&self) -> Lsn {
self.write_shared_state().await.sk.state.inmem.backup_lsn
self.read_shared_state().await.sk.state.inmem.backup_lsn
}
/// Sets backup_lsn to the given value.
pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
@@ -715,39 +659,33 @@ impl Timeline {
/// Get safekeeper info for broadcasting to broker and other peers.
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
let shared_state = self.write_shared_state().await;
let shared_state = self.read_shared_state().await;
shared_state.get_safekeeper_info(&self.ttid, conf)
}
/// Update timeline state with peer safekeeper data.
pub async fn record_safekeeper_info(&self, sk_info: SafekeeperTimelineInfo) -> Result<()> {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
pub async fn record_safekeeper_info(
self: &Arc<Self>,
sk_info: SafekeeperTimelineInfo,
) -> Result<()> {
{
let mut shared_state = self.write_shared_state().await;
shared_state.sk.record_safekeeper_info(&sk_info).await?;
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = self.update_status(&mut shared_state).await;
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
// Wake up wal backup launcher, if it is time to stop the offloading.
if is_wal_backup_action_pending {
self.wal_backup_launcher_tx.send(self.ttid).await?;
}
Ok(())
}
/// Update in memory remote consistent lsn.
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
pub async fn update_remote_consistent_lsn(self: &Arc<Self>, candidate: Lsn) {
let mut shared_state = self.write_shared_state().await;
shared_state.sk.state.inmem.remote_consistent_lsn =
max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
}
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state().await;
let shared_state = self.read_shared_state().await;
shared_state.get_peers(conf.heartbeat_timeout)
}
@@ -769,7 +707,7 @@ impl Timeline {
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
/// Thus we don't try to predict it here.
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
let ss = self.write_shared_state().await;
let ss = self.read_shared_state().await;
let term = ss.sk.state.acceptor_state.term;
let last_log_term = ss.sk.get_epoch();
let flush_lsn = ss.sk.flush_lsn();
@@ -840,12 +778,12 @@ impl Timeline {
/// Returns flush_lsn.
pub async fn get_flush_lsn(&self) -> Lsn {
self.write_shared_state().await.sk.wal_store.flush_lsn()
self.read_shared_state().await.sk.wal_store.flush_lsn()
}
/// Delete WAL segments from disk that are no longer needed. This is determined
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
pub async fn remove_old_wal(self: &Arc<Self>) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
@@ -861,9 +799,8 @@ impl Timeline {
let horizon_segno: XLogSegNo;
let remover = {
let shared_state = self.write_shared_state().await;
horizon_segno =
shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn);
let shared_state = self.read_shared_state().await;
horizon_segno = shared_state.get_horizon_segno(replication_horizon_lsn);
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
return Ok(()); // nothing to do
}
@@ -885,7 +822,7 @@ impl Timeline {
/// passed after the last save. This helps to keep remote_consistent_lsn up
/// to date so that storage nodes restart doesn't cause many pageserver ->
/// safekeeper reconnections.
pub async fn maybe_persist_control_file(&self) -> Result<()> {
pub async fn maybe_persist_control_file(self: &Arc<Self>) -> Result<()> {
self.write_shared_state()
.await
.sk
@@ -893,38 +830,33 @@ impl Timeline {
.await
}
/// Gather timeline data for metrics. If the timeline is not active, returns
/// None, we do not collect these.
/// Gather timeline data for metrics.
pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
if self.is_cancelled() {
return None;
}
let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
let state = self.write_shared_state().await;
if state.active {
Some(FullTimelineInfo {
ttid: self.ttid,
ps_feedback_count,
last_ps_feedback,
wal_backup_active: state.wal_backup_active,
timeline_is_active: state.active,
num_computes: self.walreceivers.get_num() as u32,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
mem_state: state.sk.state.inmem.clone(),
persisted_state: state.sk.state.clone(),
flush_lsn: state.sk.wal_store.flush_lsn(),
wal_storage: state.sk.wal_store.get_metrics(),
})
} else {
None
}
let state = self.read_shared_state().await;
Some(FullTimelineInfo {
ttid: self.ttid,
ps_feedback_count,
last_ps_feedback,
wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
timeline_is_active: self.broker_active.load(Ordering::Relaxed),
num_computes: self.walreceivers.get_num() as u32,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
mem_state: state.sk.state.inmem.clone(),
persisted_state: state.sk.state.clone(),
flush_lsn: state.sk.wal_store.flush_lsn(),
wal_storage: state.sk.wal_store.get_metrics(),
})
}
/// Returns in-memory timeline state to build a full debug dump.
pub async fn memory_dump(&self) -> debug_dump::Memory {
let state = self.write_shared_state().await;
let state = self.read_shared_state().await;
let (write_lsn, write_record_lsn, flush_lsn, file_open) =
state.sk.wal_store.internal_state();
@@ -933,8 +865,8 @@ impl Timeline {
is_cancelled: self.is_cancelled(),
peers_info_len: state.peers_info.0.len(),
walsenders: self.walsenders.get_all(),
wal_backup_active: state.wal_backup_active,
active: state.active,
wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
active: self.broker_active.load(Ordering::Relaxed),
num_computes: self.walreceivers.get_num() as u32,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
@@ -948,7 +880,7 @@ impl Timeline {
/// Apply a function to the control file state and persist it.
pub async fn map_control_file<T>(
&self,
self: &Arc<Self>,
f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
) -> Result<T> {
let mut state = self.write_shared_state().await;

View File

@@ -0,0 +1,140 @@
use std::{sync::Arc, time::Duration};
use tracing::{info, instrument, warn};
use utils::{lsn::Lsn, sync::gate::GateGuard};
use crate::{
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL},
timeline::{PeerInfo, ReadGuardSharedState, Timeline},
timelines_set::TimelinesSet,
wal_backup::{self, WalBackupTaskHandle},
SafeKeeperConf,
};
pub struct StateSnapshot {
pub commit_lsn: Lsn,
pub backup_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
pub peers: Vec<PeerInfo>,
}
impl StateSnapshot {
/// Create a new snapshot of the timeline state.
fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self {
Self {
commit_lsn: read_guard.sk.state.inmem.commit_lsn,
backup_lsn: read_guard.sk.state.inmem.backup_lsn,
remote_consistent_lsn: read_guard.sk.state.inmem.remote_consistent_lsn,
peers: read_guard.get_peers(heartbeat_timeout),
}
}
}
/// Control how often the manager task should wake up to check updates.
/// There is no need to check for updates more often than this.
const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
/// This task gets spawned alongside each timeline and is responsible for managing the timeline's
/// background tasks.
#[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))]
pub async fn main_task(
tli: Arc<Timeline>,
conf: SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
_gate_guard: GateGuard,
) {
scopeguard::defer! {
if tli.is_cancelled() {
info!("manager task finished");
} else {
warn!("manager task finished prematurely");
}
};
// sets whether timeline is active for broker pushes or not
let mut tli_broker_active = broker_active_set.guard(tli.clone());
let ttid = tli.ttid;
let wal_seg_size = tli.get_wal_seg_size().await;
let heartbeat_timeout = conf.heartbeat_timeout;
let mut state_version_rx = tli.get_state_version_rx();
let walreceivers = tli.get_walreceivers();
let mut num_computes_rx = walreceivers.get_num_rx();
// list of background tasks
let mut backup_task: Option<WalBackupTaskHandle> = None;
let last_state = 'outer: loop {
MANAGER_ITERATIONS_TOTAL.inc();
let state_snapshot = StateSnapshot::new(tli.read_shared_state().await, heartbeat_timeout);
let num_computes = *num_computes_rx.borrow();
let is_wal_backup_required =
wal_backup::is_wal_backup_required(wal_seg_size, num_computes, &state_snapshot);
if conf.is_wal_backup_enabled() {
wal_backup::update_task(
&conf,
ttid,
is_wal_backup_required,
&state_snapshot,
&mut backup_task,
)
.await;
}
let is_active = is_wal_backup_required
|| state_snapshot.remote_consistent_lsn < state_snapshot.commit_lsn;
// update the broker timeline set
if tli_broker_active.set(is_active) {
// write log if state has changed
info!(
"timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
is_active, state_snapshot.remote_consistent_lsn, state_snapshot.commit_lsn,
);
MANAGER_ACTIVE_CHANGES.inc();
if !is_active {
// TODO: maybe use tokio::spawn?
if let Err(e) = tli.maybe_persist_control_file().await {
warn!("control file save in update_status failed: {:?}", e);
}
}
}
// update the state in Arc<Timeline>
tli.wal_backup_active
.store(is_wal_backup_required, std::sync::atomic::Ordering::SeqCst);
tli.broker_active
.store(is_active, std::sync::atomic::Ordering::SeqCst);
// wait until something changes. tx channels are stored under Arc, so they will not be
// dropped until the manager task is finished.
tokio::select! {
_ = tli.cancel.cancelled() => {
// timeline was deleted
break 'outer state_snapshot;
}
_ = async {
// don't wake up on every state change, but at most every REFRESH_INTERVAL
tokio::time::sleep(REFRESH_INTERVAL).await;
let _ = state_version_rx.changed().await;
} => {
// state was updated
}
_ = num_computes_rx.changed() => {
// number of connected computes was updated
}
}
};
// shutdown background tasks
if conf.is_wal_backup_enabled() {
wal_backup::update_task(&conf, ttid, false, &last_state, &mut backup_task).await;
}
}

View File

@@ -4,6 +4,7 @@
use crate::safekeeper::ServerInfo;
use crate::timeline::{Timeline, TimelineError};
use crate::timelines_set::TimelinesSet;
use crate::SafeKeeperConf;
use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
@@ -11,16 +12,16 @@ use once_cell::sync::Lazy;
use serde::Serialize;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::Sender;
use tracing::*;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
wal_backup_launcher_tx: Option<Sender<TenantTimelineId>>,
conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
}
@@ -36,11 +37,8 @@ impl GlobalTimelinesState {
}
/// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (SafeKeeperConf, Sender<TenantTimelineId>) {
(
self.get_conf().clone(),
self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
)
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>) {
(self.get_conf().clone(), self.broker_active_set.clone())
}
/// Insert timeline into the map. Returns error if timeline with the same id already exists.
@@ -65,8 +63,8 @@ impl GlobalTimelinesState {
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
wal_backup_launcher_tx: None,
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
})
});
@@ -76,16 +74,11 @@ pub struct GlobalTimelines;
impl GlobalTimelines {
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
pub async fn init(
conf: SafeKeeperConf,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<()> {
pub async fn init(conf: SafeKeeperConf) -> Result<()> {
// clippy isn't smart enough to understand that drop(state) releases the
// lock, so use explicit block
let tenants_dir = {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.wal_backup_launcher_tx.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
state.conf = Some(conf);
// Iterate through all directories and load tenants for all directories
@@ -129,12 +122,9 @@ impl GlobalTimelines {
/// this function is called during init when nothing else is running, so
/// this is fine.
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
let (conf, wal_backup_launcher_tx) = {
let (conf, broker_active_set) = {
let state = TIMELINES_STATE.lock().unwrap();
(
state.get_conf().clone(),
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
)
state.get_dependencies()
};
let timelines_dir = conf.tenant_dir(&tenant_id);
@@ -147,7 +137,7 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
match Timeline::load_timeline(&conf, ttid) {
Ok(timeline) => {
let tli = Arc::new(timeline);
TIMELINES_STATE
@@ -155,8 +145,7 @@ impl GlobalTimelines {
.unwrap()
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(&conf);
tli.update_status_notify().await.unwrap();
tli.bootstrap(&conf, broker_active_set.clone());
}
// If we can't load a timeline, it's most likely because of a corrupted
// directory. We will log an error and won't allow to delete/recreate
@@ -189,9 +178,9 @@ impl GlobalTimelines {
_guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
ttid: TenantTimelineId,
) -> Result<Arc<Timeline>> {
let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
let (conf, broker_active_set) = TIMELINES_STATE.lock().unwrap().get_dependencies();
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
match Timeline::load_timeline(&conf, ttid) {
Ok(timeline) => {
let tli = Arc::new(timeline);
@@ -202,7 +191,7 @@ impl GlobalTimelines {
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(&conf);
tli.bootstrap(&conf, broker_active_set);
Ok(tli)
}
@@ -221,6 +210,10 @@ impl GlobalTimelines {
TIMELINES_STATE.lock().unwrap().get_conf().clone()
}
pub fn get_global_broker_active_set() -> Arc<TimelinesSet> {
TIMELINES_STATE.lock().unwrap().broker_active_set.clone()
}
/// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline.
pub async fn create(
@@ -229,7 +222,7 @@ impl GlobalTimelines {
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, wal_backup_launcher_tx) = {
let (conf, broker_active_set) = {
let state = TIMELINES_STATE.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
@@ -243,7 +236,6 @@ impl GlobalTimelines {
let timeline = Arc::new(Timeline::create_empty(
&conf,
ttid,
wal_backup_launcher_tx,
server_info,
commit_lsn,
local_start_lsn,
@@ -264,7 +256,10 @@ impl GlobalTimelines {
// Write the new timeline to the disk and start background workers.
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged.
if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
if let Err(e) = timeline
.init_new(&mut shared_state, &conf, broker_active_set)
.await
{
// Note: the most likely reason for init failure is that the timeline
// directory already exists on disk. This happens when timeline is corrupted
// and wasn't loaded from disk on startup because of that. We want to preserve
@@ -281,8 +276,6 @@ impl GlobalTimelines {
// We are done with bootstrap, release the lock, return the timeline.
// {} block forces release before .await
}
timeline.update_status_notify().await?;
timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
Ok(timeline)
}
@@ -335,12 +328,13 @@ impl GlobalTimelines {
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
match tli_res {
Ok(timeline) => {
let was_active = timeline.broker_active.load(Ordering::Relaxed);
// Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state().await;
info!("deleting timeline {}, only_local={}", ttid, only_local);
let (dir_existed, was_active) =
timeline.delete(&mut shared_state, only_local).await?;
let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
// Remove timeline from the map.
// FIXME: re-enable it once we fix the issue with recreation of deleted timelines
@@ -349,7 +343,7 @@ impl GlobalTimelines {
Ok(TimelineDeleteForceResult {
dir_existed,
was_active,
was_active, // TODO: we probably should remove this field
})
}
Err(_) => {

View File

@@ -0,0 +1,90 @@
use std::{collections::HashMap, sync::Arc};
use utils::id::TenantTimelineId;
use crate::timeline::Timeline;
/// Set of timelines, supports operations:
/// - add timeline
/// - remove timeline
/// - clone the set
///
/// Usually used for keeping subset of timelines. For example active timelines that require broker push.
pub struct TimelinesSet {
timelines: std::sync::Mutex<HashMap<TenantTimelineId, Arc<Timeline>>>,
}
impl Default for TimelinesSet {
fn default() -> Self {
Self {
timelines: std::sync::Mutex::new(HashMap::new()),
}
}
}
impl TimelinesSet {
pub fn insert(&self, tli: Arc<Timeline>) {
self.timelines.lock().unwrap().insert(tli.ttid, tli);
}
pub fn delete(&self, ttid: &TenantTimelineId) {
self.timelines.lock().unwrap().remove(ttid);
}
/// If present is true, adds timeline to the set, otherwise removes it.
pub fn set_present(&self, tli: Arc<Timeline>, present: bool) {
if present {
self.insert(tli);
} else {
self.delete(&tli.ttid);
}
}
pub fn is_present(&self, ttid: &TenantTimelineId) -> bool {
self.timelines.lock().unwrap().contains_key(ttid)
}
/// Returns all timelines in the set.
pub fn get_all(&self) -> Vec<Arc<Timeline>> {
self.timelines.lock().unwrap().values().cloned().collect()
}
/// Returns a timeline guard for easy presence control.
pub fn guard(self: &Arc<Self>, tli: Arc<Timeline>) -> TimelineSetGuard {
let is_present = self.is_present(&tli.ttid);
TimelineSetGuard {
timelines_set: self.clone(),
tli,
is_present,
}
}
}
/// Guard is used to add or remove timeline from the set.
/// If the timeline present in set, it will be removed from it on drop.
/// Note: do not use more than one guard for the same timeline, it caches the presence state.
/// It is designed to be used in the manager task only.
pub struct TimelineSetGuard {
timelines_set: Arc<TimelinesSet>,
tli: Arc<Timeline>,
is_present: bool,
}
impl TimelineSetGuard {
/// Returns true if the state was changed.
pub fn set(&mut self, present: bool) -> bool {
if present == self.is_present {
return false;
}
self.is_present = present;
self.timelines_set.set_present(self.tli.clone(), present);
true
}
}
impl Drop for TimelineSetGuard {
fn drop(&mut self) {
// remove timeline from the map on drop
self.timelines_set.delete(&self.tli.ttid);
}
}

View File

@@ -9,7 +9,7 @@ use utils::backoff;
use utils::id::NodeId;
use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::num::NonZeroU32;
use std::pin::Pin;
use std::sync::Arc;
@@ -29,9 +29,10 @@ use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
use crate::timeline::{PeerInfo, Timeline};
use crate::{GlobalTimelines, SafeKeeperConf};
use crate::timeline_manager::StateSnapshot;
use crate::{GlobalTimelines, SafeKeeperConf, WAL_BACKUP_RUNTIME};
use once_cell::sync::OnceCell;
@@ -41,35 +42,84 @@ const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
/// Default buffer size when interfacing with [`tokio::fs::File`].
const BUFFER_SIZE: usize = 32 * 1024;
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
/// aware of current status and return the timeline.
async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
match GlobalTimelines::get(ttid).ok() {
Some(tli) => {
tli.wal_backup_attend().await;
Some(tli)
}
None => None,
}
}
struct WalBackupTaskHandle {
pub struct WalBackupTaskHandle {
shutdown_tx: Sender<()>,
handle: JoinHandle<()>,
}
struct WalBackupTimelineEntry {
timeline: Arc<Timeline>,
handle: Option<WalBackupTaskHandle>,
/// Do we have anything to upload to S3, i.e. should safekeepers run backup activity?
pub fn is_wal_backup_required(
wal_seg_size: usize,
num_computes: usize,
state: &StateSnapshot,
) -> bool {
num_computes > 0 ||
// Currently only the whole segment is offloaded, so compare segment numbers.
(state.commit_lsn.segment_number(wal_seg_size) > state.backup_lsn.segment_number(wal_seg_size))
}
async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
if let Some(wb_handle) = entry.handle.take() {
/// Based on peer information determine which safekeeper should offload; if it
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
/// is running, kill it.
pub async fn update_task(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
need_backup: bool,
state: &StateSnapshot,
entry: &mut Option<WalBackupTaskHandle>,
) {
let (offloader, election_dbg_str) =
determine_offloader(&state.peers, state.backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
let should_task_run = need_backup && elected_me;
// start or stop the task
if should_task_run != (entry.is_some()) {
if should_task_run {
info!("elected for backup: {}", election_dbg_str);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let async_task = backup_task_main(
ttid,
timeline_dir,
conf.workdir.clone(),
conf.backup_parallel_jobs,
shutdown_rx,
);
let handle = if conf.current_thread_runtime {
tokio::spawn(async_task)
} else {
WAL_BACKUP_RUNTIME.spawn(async_task)
};
*entry = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
} else {
if !need_backup {
// don't need backup at all
info!("stepping down from backup, need_backup={}", need_backup);
} else {
// someone else has been elected
info!("stepping down from backup: {}", election_dbg_str);
}
shut_down_task(entry).await;
}
}
}
async fn shut_down_task(entry: &mut Option<WalBackupTaskHandle>) {
if let Some(wb_handle) = entry.take() {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", ttid, e);
warn!("WAL backup task panicked: {}", e);
}
}
}
@@ -126,49 +176,6 @@ fn determine_offloader(
}
}
/// Based on peer information determine which safekeeper should offload; if it
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
/// is running, kill it.
async fn update_task(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
entry: &mut WalBackupTimelineEntry,
) {
let alive_peers = entry.timeline.get_peers(conf).await;
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
let (offloader, election_dbg_str) =
determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
if elected_me != (entry.handle.is_some()) {
if elected_me {
info!("elected for backup: {}", election_dbg_str);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(
ttid,
timeline_dir,
conf.workdir.clone(),
conf.backup_parallel_jobs,
shutdown_rx,
)
.in_current_span(),
);
entry.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
} else {
info!("stepping down from backup: {}", election_dbg_str);
shut_down_task(ttid, entry).await;
}
}
}
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
// Storage must be configured and initialized when this is called.
@@ -190,67 +197,6 @@ pub fn init_remote_storage(conf: &SafeKeeperConf) {
});
}
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
/// tasks. Having this in separate task simplifies locking, allows to reap
/// panics and separate elections from offloading itself.
pub async fn wal_backup_launcher_task_main(
conf: SafeKeeperConf,
mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
) -> anyhow::Result<()> {
info!(
"WAL backup launcher started, remote config {:?}",
conf.remote_storage
);
// Presence in this map means launcher is aware s3 offloading is needed for
// the timeline, but task is started only if it makes sense for to offload
// from this safekeeper.
let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
loop {
tokio::select! {
ttid = wal_backup_launcher_rx.recv() => {
// channel is never expected to get closed
let ttid = ttid.unwrap();
if !conf.is_wal_backup_enabled() {
continue; /* just drain the channel and do nothing */
}
async {
let timeline = is_wal_backup_required(ttid).await;
// do we need to do anything at all?
if timeline.is_some() != tasks.contains_key(&ttid) {
if let Some(timeline) = timeline {
// need to start the task
let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
timeline,
handle: None,
});
update_task(&conf, ttid, entry).await;
} else {
// need to stop the task
info!("stopping WAL backup task");
let mut entry = tasks.remove(&ttid).unwrap();
shut_down_task(ttid, &mut entry).await;
}
}
}.instrument(info_span!("WAL backup", ttid = %ttid)).await;
}
// For each timeline needing offloading, check if this safekeeper
// should do the job and start/stop the task accordingly.
_ = ticker.tick() => {
for (ttid, entry) in tasks.iter_mut() {
update_task(&conf, *ttid, entry)
.instrument(info_span!("WAL backup", ttid = %ttid))
.await;
}
}
}
}
}
struct WalBackupTask {
timeline: Arc<Timeline>,
timeline_dir: Utf8PathBuf,
@@ -261,6 +207,7 @@ struct WalBackupTask {
}
/// Offload single timeline.
#[instrument(name = "WAL backup", skip_all, fields(ttid = %ttid))]
async fn backup_task_main(
ttid: TenantTimelineId,
timeline_dir: Utf8PathBuf,
@@ -268,6 +215,8 @@ async fn backup_task_main(
parallel_jobs: usize,
mut shutdown_rx: Receiver<()>,
) {
let _guard = WAL_BACKUP_TASKS.guard();
info!("started");
let res = GlobalTimelines::get(ttid);
if let Err(e) = res {

View File

@@ -277,14 +277,6 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
debug!("started");
let await_duration = conf.partial_backup_timeout;
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};
// sleep for random time to avoid thundering herd
{
let randf64 = rand::thread_rng().gen_range(0.0..1.0);
@@ -327,7 +319,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
&& flush_lsn_rx.borrow().term == seg.term
{
tokio::select! {
_ = cancellation_rx.changed() => {
_ = backup.tli.cancel.cancelled() => {
info!("timeline canceled");
return;
}
@@ -340,7 +332,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
// if we don't have any data and zero LSNs, wait for something
while flush_lsn_rx.borrow().lsn == Lsn(0) {
tokio::select! {
_ = cancellation_rx.changed() => {
_ = backup.tli.cancel.cancelled() => {
info!("timeline canceled");
return;
}
@@ -357,7 +349,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
// waiting until timeout expires OR segno changes
'inner: loop {
tokio::select! {
_ = cancellation_rx.changed() => {
_ = backup.tli.cancel.cancelled() => {
info!("timeline canceled");
return;
}