rustfmt and clippy fixes

This commit is contained in:
Heikki Linnakangas
2022-03-09 15:24:23 +02:00
parent e096c62494
commit 2896d35a8b
10 changed files with 155 additions and 95 deletions

View File

@@ -1,6 +1,6 @@
use std::ops::Range;
use crate::repository::{Key, key_range_size, singleton_range};
use crate::repository::{key_range_size, singleton_range, Key};
use postgres_ffi::pg_constants;
@@ -22,7 +22,6 @@ pub struct KeyPartitioning {
}
impl KeyPartitioning {
pub fn new() -> Self {
KeyPartitioning {
accum: None,
@@ -45,7 +44,7 @@ impl KeyPartitioning {
self.ranges.push(accum.clone());
*accum = range;
}
},
}
None => self.accum = Some(range),
}
}
@@ -61,11 +60,9 @@ impl KeyPartitioning {
let mut current_part = Vec::new();
let mut current_part_size: usize = 0;
for range in &self.ranges {
let this_size = key_range_size(&range) as usize;
let this_size = key_range_size(range) as usize;
if current_part_size + this_size > target_nblocks &&
!current_part.is_empty()
{
if current_part_size + this_size > target_nblocks && !current_part.is_empty() {
self.partitions.push(current_part);
current_part = Vec::new();
current_part_size = 0;
@@ -87,3 +84,9 @@ impl KeyPartitioning {
}
}
}
impl Default for KeyPartitioning {
fn default() -> Self {
Self::new()
}
}

View File

@@ -19,7 +19,7 @@ use itertools::Itertools;
use lazy_static::lazy_static;
use tracing::*;
use std::cmp::{min, max, Ordering};
use std::cmp::{max, min, Ordering};
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
@@ -1006,7 +1006,12 @@ impl LayeredTimeline {
///
/// This function takes the current timeline's locked LayerMap as an argument,
/// so callers can avoid potential race conditions.
fn get_reconstruct_data(&self, key: Key, request_lsn: Lsn, reconstruct_state: &mut ValueReconstructState) -> Result<()> {
fn get_reconstruct_data(
&self,
key: Key,
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
) -> Result<()> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
@@ -1031,7 +1036,12 @@ impl LayeredTimeline {
// Didn't make any progress in last iteration. Error out to avoid
// getting stuck in the loop.
for (r, c, l) in path {
error!("PATH: result {:?}, cont_lsn {}, layer: {}", r, c, l.filename().display());
error!(
"PATH: result {:?}, cont_lsn {}, layer: {}",
r,
c,
l.filename().display()
);
}
bail!("could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
key,
@@ -1053,7 +1063,11 @@ impl LayeredTimeline {
// Recurse into ancestor if needed
if Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
info!("going into ancestor {}, cont_lsn is {}", timeline.ancestor_lsn, cont_lsn);
info!(
"going into ancestor {}, cont_lsn is {}",
timeline.ancestor_lsn,
cont_lsn
);
let ancestor = timeline.get_ancestor_timeline()?;
timeline_owned = ancestor;
timeline = &*timeline_owned;
@@ -1068,7 +1082,11 @@ impl LayeredTimeline {
let start_lsn = open_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
result = open_layer.get_value_reconstruct_data(key, open_layer.get_lsn_range().start..cont_lsn, reconstruct_state)?;
result = open_layer.get_value_reconstruct_data(
key,
open_layer.get_lsn_range().start..cont_lsn,
reconstruct_state,
)?;
cont_lsn = start_lsn;
path.push((result, cont_lsn, open_layer.clone()));
continue;
@@ -1078,19 +1096,25 @@ impl LayeredTimeline {
let start_lsn = frozen_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
result = frozen_layer.get_value_reconstruct_data(key, frozen_layer.get_lsn_range().start..cont_lsn, reconstruct_state)?;
result = frozen_layer.get_value_reconstruct_data(
key,
frozen_layer.get_lsn_range().start..cont_lsn,
reconstruct_state,
)?;
cont_lsn = start_lsn;
path.push((result, cont_lsn, frozen_layer.clone()));
continue;
}
}
if let Some(SearchResult { lsn_floor, layer }) = layers
.search(key, cont_lsn)?
{
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn)? {
//info!("CHECKING for {} at {} on historic layer {}", key, cont_lsn, layer.filename().display());
result = layer.get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state)?;
result = layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
)?;
cont_lsn = lsn_floor;
path.push((result, cont_lsn, layer));
} else if self.ancestor_timeline.is_some() {
@@ -1427,11 +1451,16 @@ impl LayeredTimeline {
}
// Is it time to create a new image layer for the given partition?
fn time_for_new_image_layer(&self, partition: &Vec<Range<Key>>, lsn: Lsn, threshold: usize) -> Result<bool> {
fn time_for_new_image_layer(
&self,
partition: &[Range<Key>],
lsn: Lsn,
threshold: usize,
) -> Result<bool> {
let layers = self.layers.lock().unwrap();
for part_range in partition {
let image_coverage = layers.image_coverage(&part_range, lsn)?;
let image_coverage = layers.image_coverage(part_range, lsn)?;
for (img_range, last_img) in image_coverage {
let img_lsn = if let Some(ref last_img) = last_img {
last_img.get_lsn_range().end
@@ -1454,7 +1483,7 @@ impl LayeredTimeline {
Ok(false)
}
fn create_image_layer(&self, partition: &Vec<Range<Key>>, lsn: Lsn) -> Result<()> {
fn create_image_layer(&self, partition: &[Range<Key>], lsn: Lsn) -> Result<()> {
let img_range = partition.first().unwrap().start..partition.last().unwrap().end;
let mut image_layer_writer =
ImageLayerWriter::new(self.conf, self.timelineid, self.tenantid, &img_range, lsn)?;
@@ -1492,18 +1521,18 @@ impl LayeredTimeline {
// FIXME: this function probably won't work correctly if there's overlap
// in the deltas.
let lsn_range = level0_deltas.iter().map(|l| l.get_lsn_range()).reduce(|a, b| {
min(a.start, b.start)..max(a.end, b.end)
}).unwrap();
let lsn_range = level0_deltas
.iter()
.map(|l| l.get_lsn_range())
.reduce(|a, b| min(a.start, b.start)..max(a.end, b.end))
.unwrap();
let all_values_iter = level0_deltas.iter().map(|l| l.iter()).kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => {
a_lsn <= b_lsn
}
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
} else {
@@ -1678,9 +1707,15 @@ impl LayeredTimeline {
// OK for a delta layer to have end LSN 101, but if the end LSN
// is 102, then it might not have been fully flushed to disk
// before crash.
if !layers.newer_image_layer_exists(&l.get_key_range(), l.get_lsn_range().end, disk_consistent_lsn+1)?
{
info!("keeping {} because it is the latest layer", l.filename().display());
if !layers.newer_image_layer_exists(
&l.get_key_range(),
l.get_lsn_range().end,
disk_consistent_lsn + 1,
)? {
info!(
"keeping {} because it is the latest layer",
l.filename().display()
);
result.layers_not_updated += 1;
continue 'outer;
}
@@ -1691,7 +1726,7 @@ impl LayeredTimeline {
l.filename().display(),
l.is_incremental(),
);
layers_to_remove.push(Arc::clone(&l));
layers_to_remove.push(Arc::clone(l));
}
// Actually delete the layers from disk and remove them from the map.
@@ -2046,9 +2081,12 @@ mod tests {
updated[blknum] = lsn;
}
for blknum in 0..NUM_KEYS {
for (blknum, last_lsn) in updated.iter().enumerate() {
test_key.field6 = blknum as u32;
assert_eq!(tline.get(test_key, lsn)?, TEST_IMG(&format!("{} at {}", blknum, updated[blknum])));
assert_eq!(
tline.get(test_key, lsn)?,
TEST_IMG(&format!("{} at {}", blknum, last_lsn))
);
}
println!("checkpointing {}", lsn);

View File

@@ -237,7 +237,7 @@ impl Layer for DeltaLayer {
match DeltaValueIter::new(inner) {
Ok(iter) => Box::new(iter),
Err(err) => Box::new(std::iter::once(Err(err)))
Err(err) => Box::new(std::iter::once(Err(err))),
}
}
@@ -507,12 +507,12 @@ impl DeltaLayerWriter {
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let path = conf
.timeline_path(&timelineid, &tenantid)
.join(format!("{}-XXX__{:016X}-{:016X}.temp",
key_start,
u64::from(lsn_range.start),
u64::from(lsn_range.end)));
let path = conf.timeline_path(&timelineid, &tenantid).join(format!(
"{}-XXX__{:016X}-{:016X}.temp",
key_start,
u64::from(lsn_range.start),
u64::from(lsn_range.end)
));
info!("temp deltalayer path {}", path.display());
let file = VirtualFile::create(&path)?;
let buf_writer = BufWriter::new(file);
@@ -632,7 +632,7 @@ impl DeltaLayerWriter {
pub fn abort(self) {
match self.values_writer.close() {
Ok(book) => {
Ok(book) => {
if let Err(err) = book.close() {
error!("error while closing delta layer file: {}", err);
}
@@ -650,7 +650,7 @@ impl DeltaLayerWriter {
struct DeltaValueIter<'a> {
all_offsets: Vec<(Key, Lsn, u64)>,
next_idx: usize,
inner: RwLockReadGuard<'a, DeltaLayerInner>,
}
@@ -671,7 +671,6 @@ impl<'a> Iterator for DeltaValueIter<'a> {
///
impl<'a> DeltaValueIter<'a> {
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
let mut index: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
index.sort_by_key(|x| x.0);
@@ -693,12 +692,13 @@ impl<'a> DeltaValueIter<'a> {
if self.next_idx < self.all_offsets.len() {
let (key, lsn, off) = self.all_offsets[self.next_idx];
let values_reader = self.inner
let values_reader = self
.inner
.book
.as_ref()
.expect("should be loaded in load call above")
.chapter_reader(VALUES_CHAPTER)?;
let val = Value::des(&utils::read_blob_from_chapter(&values_reader, off)?)?;
self.next_idx += 1;
@@ -708,4 +708,3 @@ impl<'a> DeltaValueIter<'a> {
}
}
}

View File

@@ -184,7 +184,7 @@ impl Layer for ImageLayer {
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>>> {
todo!();
}
fn unload(&self) -> Result<()> {
// TODO: unload 'segs'. Or even better, don't hold it in memory but
// access it directly from the file (using the buffer cache)

View File

@@ -10,8 +10,8 @@
//! corresponding files are written to disk.
//!
use crate::layered_repository::storage_layer::{range_eq, range_overlaps};
use crate::layered_repository::storage_layer::Layer;
use crate::layered_repository::storage_layer::{range_eq, range_overlaps};
use crate::layered_repository::InMemoryLayer;
use crate::repository::Key;
use anyhow::Result;
@@ -143,13 +143,18 @@ impl LayerMap {
);
let lsn_floor = std::cmp::max(
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
l.get_lsn_range().start);
l.get_lsn_range().start,
);
Ok(Some(SearchResult {
lsn_floor,
layer: l,
}))
} else if let Some(l) = latest_img {
trace!("found img layer and no deltas for request on {} at {}", key, end_lsn);
trace!(
"found img layer and no deltas for request on {} at {}",
key,
end_lsn
);
Ok(Some(SearchResult {
lsn_floor: latest_img_lsn.unwrap(),
layer: l,
@@ -202,7 +207,6 @@ impl LayerMap {
lsn: Lsn,
disk_consistent_lsn: Lsn,
) -> Result<bool> {
let mut range_remain = key_range.clone();
loop {
@@ -212,10 +216,10 @@ impl LayerMap {
continue;
}
let img_lsn = l.get_lsn_range().start;
if !l.is_incremental() &&
l.get_key_range().contains(&range_remain.start) &&
img_lsn > lsn &&
img_lsn < disk_consistent_lsn
if !l.is_incremental()
&& l.get_key_range().contains(&range_remain.start)
&& img_lsn > lsn
&& img_lsn < disk_consistent_lsn
{
made_progress = true;
let img_key_end = l.get_key_range().end;
@@ -329,11 +333,7 @@ impl LayerMap {
Ok(ranges)
}
pub fn count_deltas(
&self,
key_range: &Range<Key>,
lsn_range: &Range<Lsn>,
) -> Result<usize> {
pub fn count_deltas(&self, key_range: &Range<Key>, lsn_range: &Range<Lsn>) -> Result<usize> {
let mut result = 0;
for l in self.historic_layers.iter() {
if !l.is_incremental() {
@@ -348,8 +348,8 @@ impl LayerMap {
// We ignore level0 delta layers. Unless the whole keyspace fits
// into one partition
if !range_eq(key_range, &(Key::MIN..Key::MAX)) &&
range_eq(&l.get_key_range(), &(Key::MIN..Key::MAX))
if !range_eq(key_range, &(Key::MIN..Key::MAX))
&& range_eq(&l.get_key_range(), &(Key::MIN..Key::MAX))
{
continue;
}

View File

@@ -130,7 +130,7 @@ pub trait Layer: Send + Sync {
fn is_in_memory(&self) -> bool;
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + '_>;
/// Return a set of all distinct Keys present in this layer
fn collect_keys(&self, key_range: &Range<Key>, keys: &mut HashSet<Key>) -> Result<()>;

View File

@@ -21,8 +21,8 @@ use std::ops::Range;
use std::sync::{Arc, RwLockReadGuard};
use tracing::{debug, info, warn};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::{Lsn, RecordLsn};
use zenith_utils::lsn::AtomicLsn;
use zenith_utils::lsn::{Lsn, RecordLsn};
/// Block number within a relation or SRU. This matches PostgreSQL's BlockNumber type.
pub type BlockNumber = u32;
@@ -311,11 +311,15 @@ impl<R: Repository> DatadirTimeline<R> {
let dbdir = DbDirectory::des(&buf)?;
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbs.iter().cloned().collect();
dbs.sort();
dbs.sort_unstable();
for (spcnode, dbnode) in dbs {
result.add_key(relmap_file_key(spcnode, dbnode));
let mut rels: Vec<RelTag> = self.list_rels(spcnode, dbnode, lsn)?.iter().cloned().collect();
rels.sort();
let mut rels: Vec<RelTag> = self
.list_rels(spcnode, dbnode, lsn)?
.iter()
.cloned()
.collect();
rels.sort_unstable();
for rel in rels {
let relsize_key = rel_size_to_key(rel);
let mut buf = self.tline.get(relsize_key, lsn)?;
@@ -327,18 +331,24 @@ impl<R: Repository> DatadirTimeline<R> {
}
// Iterate SLRUs next
for kind in [SlruKind::Clog, SlruKind:: MultiXactMembers, SlruKind::MultiXactOffsets] {
for kind in [
SlruKind::Clog,
SlruKind::MultiXactMembers,
SlruKind::MultiXactOffsets,
] {
let slrudir_key = slru_dir_to_key(kind);
let buf = self.tline.get(slrudir_key, lsn)?;
let dir = SlruSegmentDirectory::des(&buf)?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.tline.get(segsize_key, lsn)?;
let segsize = buf.get_u32_le();
result.add_range(slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize));
result.add_range(
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
);
result.add_key(segsize_key);
}
}
@@ -347,7 +357,7 @@ impl<R: Repository> DatadirTimeline<R> {
let buf = self.tline.get(TWOPHASEDIR_KEY, lsn)?;
let twophase_dir = TwoPhaseDirectory::des(&buf)?;
let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
xids.sort();
xids.sort_unstable();
for xid in xids {
result.add_key(twophase_file_key(xid));
}

View File

@@ -86,7 +86,6 @@ impl Ord for RelTag {
}
}
/// Display RelTag in the same format that's used in most PostgreSQL debug messages:
///
/// <spcnode>/<dbnode>/<relnode>[_fsm|_vm|_init]

View File

@@ -26,13 +26,12 @@ pub struct Key {
}
impl Key {
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = self.clone();
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
@@ -72,16 +71,14 @@ impl Key {
}
}
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
let start = key_range.start;
let end = key_range.end;
if end.field1 != start.field1 ||
end.field2 != start.field2 ||
end.field3 != start.field3 ||
end.field4 != start.field4
if end.field1 != start.field1
|| end.field2 != start.field2
|| end.field3 != start.field3
|| end.field4 != start.field4
{
return u32::MAX;
}
@@ -630,20 +627,36 @@ mod tests {
{
let writer = tline.writer();
// Create a relation on the timeline
writer.put(*TEST_KEY, lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))))?;
writer.put(
*TEST_KEY,
lsn,
Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer.advance_last_record_lsn(lsn);
lsn += 0x10;
writer.put(*TEST_KEY, lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))))?;
writer.put(
*TEST_KEY,
lsn,
Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer.advance_last_record_lsn(lsn);
lsn += 0x10;
}
tline.checkpoint(CheckpointConfig::Forced)?;
{
let writer = tline.writer();
writer.put(*TEST_KEY, lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))))?;
writer.put(
*TEST_KEY,
lsn,
Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer.advance_last_record_lsn(lsn);
lsn += 0x10;
writer.put(*TEST_KEY, lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))))?;
writer.put(
*TEST_KEY,
lsn,
Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer.advance_last_record_lsn(lsn);
}
tline.checkpoint(CheckpointConfig::Forced)
@@ -668,10 +681,10 @@ mod tests {
Err(err) => {
assert!(err.to_string().contains("invalid branch start lsn"));
assert!(err
.source()
.unwrap()
.to_string()
.contains("we might've already garbage collected needed data"))
.source()
.unwrap()
.to_string()
.contains("we might've already garbage collected needed data"))
}
}
@@ -689,10 +702,10 @@ mod tests {
Err(err) => {
assert!(&err.to_string().contains("invalid branch start lsn"));
assert!(&err
.source()
.unwrap()
.to_string()
.contains("is earlier than latest GC horizon"));
.source()
.unwrap()
.to_string()
.contains("is earlier than latest GC horizon"));
}
}

View File

@@ -996,9 +996,7 @@ mod tests {
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
fn init_walingest_test<'a, R: Repository>(
tline: &'a DatadirTimeline<R>,
) -> Result<WalIngest<'a, R>> {
fn init_walingest_test<R: Repository>(tline: &DatadirTimeline<R>) -> Result<WalIngest<R>> {
let mut writer = tline.begin_record(Lsn(0x10));
writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
writer.finish()?;