mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 14:00:38 +00:00
Enforce LSN ordering of batch entries (#7071)
## Summary of changes Enforce LSN ordering of batch entries. Closes https://github.com/neondatabase/neon/issues/6707
This commit is contained in:
@@ -1,27 +1,60 @@
|
||||
use std::{alloc::Layout, cmp::Ordering, ops::RangeBounds};
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum VecMapOrdering {
|
||||
Greater,
|
||||
GreaterOrEqual,
|
||||
}
|
||||
|
||||
/// Ordered map datastructure implemented in a Vec.
|
||||
/// Append only - can only add keys that are larger than the
|
||||
/// current max key.
|
||||
/// Ordering can be adjusted using [`VecMapOrdering`]
|
||||
/// during `VecMap` construction.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct VecMap<K, V>(Vec<(K, V)>);
|
||||
pub struct VecMap<K, V> {
|
||||
data: Vec<(K, V)>,
|
||||
ordering: VecMapOrdering,
|
||||
}
|
||||
|
||||
impl<K, V> Default for VecMap<K, V> {
|
||||
fn default() -> Self {
|
||||
VecMap(Default::default())
|
||||
VecMap {
|
||||
data: Default::default(),
|
||||
ordering: VecMapOrdering::Greater,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InvalidKey;
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum VecMapError {
|
||||
#[error("Key violates ordering constraint")]
|
||||
InvalidKey,
|
||||
#[error("Mismatched ordering constraints")]
|
||||
ExtendOrderingError,
|
||||
}
|
||||
|
||||
impl<K: Ord, V> VecMap<K, V> {
|
||||
pub fn new(ordering: VecMapOrdering) -> Self {
|
||||
Self {
|
||||
data: Vec::new(),
|
||||
ordering,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_capacity(capacity: usize, ordering: VecMapOrdering) -> Self {
|
||||
Self {
|
||||
data: Vec::with_capacity(capacity),
|
||||
ordering,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
self.data.is_empty()
|
||||
}
|
||||
|
||||
pub fn as_slice(&self) -> &[(K, V)] {
|
||||
self.0.as_slice()
|
||||
self.data.as_slice()
|
||||
}
|
||||
|
||||
/// This function may panic if given a range where the lower bound is
|
||||
@@ -29,7 +62,7 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
pub fn slice_range<R: RangeBounds<K>>(&self, range: R) -> &[(K, V)] {
|
||||
use std::ops::Bound::*;
|
||||
|
||||
let binary_search = |k: &K| self.0.binary_search_by_key(&k, extract_key);
|
||||
let binary_search = |k: &K| self.data.binary_search_by_key(&k, extract_key);
|
||||
|
||||
let start_idx = match range.start_bound() {
|
||||
Unbounded => 0,
|
||||
@@ -41,7 +74,7 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
};
|
||||
|
||||
let end_idx = match range.end_bound() {
|
||||
Unbounded => self.0.len(),
|
||||
Unbounded => self.data.len(),
|
||||
Included(k) => match binary_search(k) {
|
||||
Ok(idx) => idx + 1,
|
||||
Err(idx) => idx,
|
||||
@@ -49,34 +82,30 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
Excluded(k) => binary_search(k).unwrap_or_else(std::convert::identity),
|
||||
};
|
||||
|
||||
&self.0[start_idx..end_idx]
|
||||
&self.data[start_idx..end_idx]
|
||||
}
|
||||
|
||||
/// Add a key value pair to the map.
|
||||
/// If `key` is less than or equal to the current maximum key
|
||||
/// the pair will not be added and InvalidKey error will be returned.
|
||||
pub fn append(&mut self, key: K, value: V) -> Result<usize, InvalidKey> {
|
||||
if let Some((last_key, _last_value)) = self.0.last() {
|
||||
if &key <= last_key {
|
||||
return Err(InvalidKey);
|
||||
}
|
||||
}
|
||||
/// If `key` is not respective of the `self` ordering the
|
||||
/// pair will not be added and `InvalidKey` error will be returned.
|
||||
pub fn append(&mut self, key: K, value: V) -> Result<usize, VecMapError> {
|
||||
self.validate_key_order(&key)?;
|
||||
|
||||
let delta_size = self.instrument_vec_op(|vec| vec.push((key, value)));
|
||||
Ok(delta_size)
|
||||
}
|
||||
|
||||
/// Update the maximum key value pair or add a new key value pair to the map.
|
||||
/// If `key` is less than the current maximum key no updates or additions
|
||||
/// will occur and InvalidKey error will be returned.
|
||||
/// If `key` is not respective of the `self` ordering no updates or additions
|
||||
/// will occur and `InvalidKey` error will be returned.
|
||||
pub fn append_or_update_last(
|
||||
&mut self,
|
||||
key: K,
|
||||
mut value: V,
|
||||
) -> Result<(Option<V>, usize), InvalidKey> {
|
||||
if let Some((last_key, last_value)) = self.0.last_mut() {
|
||||
) -> Result<(Option<V>, usize), VecMapError> {
|
||||
if let Some((last_key, last_value)) = self.data.last_mut() {
|
||||
match key.cmp(last_key) {
|
||||
Ordering::Less => return Err(InvalidKey),
|
||||
Ordering::Less => return Err(VecMapError::InvalidKey),
|
||||
Ordering::Equal => {
|
||||
std::mem::swap(last_value, &mut value);
|
||||
const DELTA_SIZE: usize = 0;
|
||||
@@ -100,40 +129,67 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
V: Clone,
|
||||
{
|
||||
let split_idx = self
|
||||
.0
|
||||
.data
|
||||
.binary_search_by_key(&cutoff, extract_key)
|
||||
.unwrap_or_else(std::convert::identity);
|
||||
|
||||
(
|
||||
VecMap(self.0[..split_idx].to_vec()),
|
||||
VecMap(self.0[split_idx..].to_vec()),
|
||||
VecMap {
|
||||
data: self.data[..split_idx].to_vec(),
|
||||
ordering: self.ordering,
|
||||
},
|
||||
VecMap {
|
||||
data: self.data[split_idx..].to_vec(),
|
||||
ordering: self.ordering,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Move items from `other` to the end of `self`, leaving `other` empty.
|
||||
/// If any keys in `other` is less than or equal to any key in `self`,
|
||||
/// `InvalidKey` error will be returned and no mutation will occur.
|
||||
pub fn extend(&mut self, other: &mut Self) -> Result<usize, InvalidKey> {
|
||||
let self_last_opt = self.0.last().map(extract_key);
|
||||
let other_first_opt = other.0.last().map(extract_key);
|
||||
/// If the `other` ordering is different from `self` ordering
|
||||
/// `ExtendOrderingError` error will be returned.
|
||||
/// If any keys in `other` is not respective of the ordering defined in
|
||||
/// `self`, `InvalidKey` error will be returned and no mutation will occur.
|
||||
pub fn extend(&mut self, other: &mut Self) -> Result<usize, VecMapError> {
|
||||
if self.ordering != other.ordering {
|
||||
return Err(VecMapError::ExtendOrderingError);
|
||||
}
|
||||
|
||||
if let (Some(self_last), Some(other_first)) = (self_last_opt, other_first_opt) {
|
||||
if self_last >= other_first {
|
||||
return Err(InvalidKey);
|
||||
let other_first_opt = other.data.last().map(extract_key);
|
||||
if let Some(other_first) = other_first_opt {
|
||||
self.validate_key_order(other_first)?;
|
||||
}
|
||||
|
||||
let delta_size = self.instrument_vec_op(|vec| vec.append(&mut other.data));
|
||||
Ok(delta_size)
|
||||
}
|
||||
|
||||
/// Validate the current last key in `self` and key being
|
||||
/// inserted against the order defined in `self`.
|
||||
fn validate_key_order(&self, key: &K) -> Result<(), VecMapError> {
|
||||
if let Some(last_key) = self.data.last().map(extract_key) {
|
||||
match (&self.ordering, &key.cmp(last_key)) {
|
||||
(VecMapOrdering::Greater, Ordering::Less | Ordering::Equal) => {
|
||||
return Err(VecMapError::InvalidKey);
|
||||
}
|
||||
(VecMapOrdering::Greater, Ordering::Greater) => {}
|
||||
(VecMapOrdering::GreaterOrEqual, Ordering::Less) => {
|
||||
return Err(VecMapError::InvalidKey);
|
||||
}
|
||||
(VecMapOrdering::GreaterOrEqual, Ordering::Equal | Ordering::Greater) => {}
|
||||
}
|
||||
}
|
||||
|
||||
let delta_size = self.instrument_vec_op(|vec| vec.append(&mut other.0));
|
||||
Ok(delta_size)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Instrument an operation on the underlying [`Vec`].
|
||||
/// Will panic if the operation decreases capacity.
|
||||
/// Returns the increase in memory usage caused by the op.
|
||||
fn instrument_vec_op(&mut self, op: impl FnOnce(&mut Vec<(K, V)>)) -> usize {
|
||||
let old_cap = self.0.capacity();
|
||||
op(&mut self.0);
|
||||
let new_cap = self.0.capacity();
|
||||
let old_cap = self.data.capacity();
|
||||
op(&mut self.data);
|
||||
let new_cap = self.data.capacity();
|
||||
|
||||
match old_cap.cmp(&new_cap) {
|
||||
Ordering::Less => {
|
||||
@@ -145,6 +201,36 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
Ordering::Greater => panic!("VecMap capacity shouldn't ever decrease"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Similar to `from_iter` defined in `FromIter` trait except
|
||||
/// that it accepts an [`VecMapOrdering`]
|
||||
pub fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I, ordering: VecMapOrdering) -> Self {
|
||||
let iter = iter.into_iter();
|
||||
let initial_capacity = {
|
||||
match iter.size_hint() {
|
||||
(lower_bound, None) => lower_bound,
|
||||
(_, Some(upper_bound)) => upper_bound,
|
||||
}
|
||||
};
|
||||
|
||||
let mut vec_map = VecMap::with_capacity(initial_capacity, ordering);
|
||||
for (key, value) in iter {
|
||||
vec_map
|
||||
.append(key, value)
|
||||
.expect("The passed collection needs to be sorted!");
|
||||
}
|
||||
|
||||
vec_map
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: Ord, V> IntoIterator for VecMap<K, V> {
|
||||
type Item = (K, V);
|
||||
type IntoIter = std::vec::IntoIter<(K, V)>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.data.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_key<K, V>(entry: &(K, V)) -> &K {
|
||||
@@ -155,7 +241,7 @@ fn extract_key<K, V>(entry: &(K, V)) -> &K {
|
||||
mod tests {
|
||||
use std::{collections::BTreeMap, ops::Bound};
|
||||
|
||||
use super::VecMap;
|
||||
use super::{VecMap, VecMapOrdering};
|
||||
|
||||
#[test]
|
||||
fn unbounded_range() {
|
||||
@@ -310,5 +396,59 @@ mod tests {
|
||||
left.extend(&mut one_map).unwrap_err();
|
||||
assert_eq!(left.as_slice(), &[(0, ()), (1, ())]);
|
||||
assert_eq!(one_map.as_slice(), &[(1, ())]);
|
||||
|
||||
let mut map_greater_or_equal = VecMap::new(VecMapOrdering::GreaterOrEqual);
|
||||
map_greater_or_equal.append(2, ()).unwrap();
|
||||
map_greater_or_equal.append(2, ()).unwrap();
|
||||
|
||||
left.extend(&mut map_greater_or_equal).unwrap_err();
|
||||
assert_eq!(left.as_slice(), &[(0, ()), (1, ())]);
|
||||
assert_eq!(map_greater_or_equal.as_slice(), &[(2, ()), (2, ())]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extend_with_ordering() {
|
||||
let mut left = VecMap::new(VecMapOrdering::GreaterOrEqual);
|
||||
left.append(0, ()).unwrap();
|
||||
assert_eq!(left.as_slice(), &[(0, ())]);
|
||||
|
||||
let mut greater_right = VecMap::new(VecMapOrdering::Greater);
|
||||
greater_right.append(0, ()).unwrap();
|
||||
left.extend(&mut greater_right).unwrap_err();
|
||||
assert_eq!(left.as_slice(), &[(0, ())]);
|
||||
|
||||
let mut greater_or_equal_right = VecMap::new(VecMapOrdering::GreaterOrEqual);
|
||||
greater_or_equal_right.append(2, ()).unwrap();
|
||||
greater_or_equal_right.append(2, ()).unwrap();
|
||||
left.extend(&mut greater_or_equal_right).unwrap();
|
||||
assert_eq!(left.as_slice(), &[(0, ()), (2, ()), (2, ())]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn vec_map_from_sorted() {
|
||||
let vec = vec![(1, ()), (2, ()), (3, ()), (6, ())];
|
||||
let vec_map = VecMap::from_iter(vec, VecMapOrdering::Greater);
|
||||
assert_eq!(vec_map.as_slice(), &[(1, ()), (2, ()), (3, ()), (6, ())]);
|
||||
|
||||
let vec = vec![(1, ()), (2, ()), (3, ()), (3, ()), (6, ()), (6, ())];
|
||||
let vec_map = VecMap::from_iter(vec, VecMapOrdering::GreaterOrEqual);
|
||||
assert_eq!(
|
||||
vec_map.as_slice(),
|
||||
&[(1, ()), (2, ()), (3, ()), (3, ()), (6, ()), (6, ())]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn vec_map_from_unsorted_greater() {
|
||||
let vec = vec![(1, ()), (2, ()), (2, ()), (3, ()), (6, ())];
|
||||
let _ = VecMap::from_iter(vec, VecMapOrdering::Greater);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn vec_map_from_unsorted_greater_or_equal() {
|
||||
let vec = vec![(1, ()), (2, ()), (3, ()), (6, ()), (5, ())];
|
||||
let _ = VecMap::from_iter(vec, VecMapOrdering::GreaterOrEqual);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::vec_map::{VecMap, VecMapOrdering};
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
const MAX_AUX_FILE_DELTAS: usize = 1024;
|
||||
@@ -1546,12 +1547,13 @@ impl<'a> DatadirModification<'a> {
|
||||
if !self.pending_updates.is_empty() {
|
||||
// The put_batch call below expects expects the inputs to be sorted by Lsn,
|
||||
// so we do that first.
|
||||
let lsn_ordered_batch: Vec<(Key, Lsn, Value)> = self
|
||||
.pending_updates
|
||||
.drain()
|
||||
.map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (key, lsn, val)))
|
||||
.kmerge_by(|lhs, rhs| lhs.1 .0 < rhs.1 .0)
|
||||
.collect();
|
||||
let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
|
||||
self.pending_updates
|
||||
.drain()
|
||||
.map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
|
||||
.kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
|
||||
VecMapOrdering::GreaterOrEqual,
|
||||
);
|
||||
|
||||
writer.put_batch(lsn_ordered_batch, ctx).await?;
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@ use tracing::*;
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
sync::gate::{Gate, GateGuard},
|
||||
vec_map::VecMap,
|
||||
};
|
||||
|
||||
use std::ops::{Deref, Range};
|
||||
@@ -4616,16 +4617,15 @@ impl<'a> TimelineWriter<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Put a batch keys at the specified Lsns.
|
||||
/// Put a batch of keys at the specified Lsns.
|
||||
///
|
||||
/// The batch should be sorted by Lsn such that it's safe
|
||||
/// to roll the open layer mid batch.
|
||||
/// The batch is sorted by Lsn (enforced by usage of [`utils::vec_map::VecMap`].
|
||||
pub(crate) async fn put_batch(
|
||||
&mut self,
|
||||
batch: Vec<(Key, Lsn, Value)>,
|
||||
batch: VecMap<Lsn, (Key, Value)>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
for (key, lsn, val) in batch {
|
||||
for (lsn, (key, val)) in batch {
|
||||
self.put(key, lsn, &val, ctx).await?
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user