diff --git a/pageserver/src/keyspace.rs b/pageserver/src/keyspace.rs index 0ad0c404ff..bef2fe62a2 100644 --- a/pageserver/src/keyspace.rs +++ b/pageserver/src/keyspace.rs @@ -1,6 +1,6 @@ use std::ops::Range; -use crate::repository::{Key, key_range_size, singleton_range}; +use crate::repository::{key_range_size, singleton_range, Key}; use postgres_ffi::pg_constants; @@ -22,7 +22,6 @@ pub struct KeyPartitioning { } impl KeyPartitioning { - pub fn new() -> Self { KeyPartitioning { accum: None, @@ -45,7 +44,7 @@ impl KeyPartitioning { self.ranges.push(accum.clone()); *accum = range; } - }, + } None => self.accum = Some(range), } } @@ -61,11 +60,9 @@ impl KeyPartitioning { let mut current_part = Vec::new(); let mut current_part_size: usize = 0; for range in &self.ranges { - let this_size = key_range_size(&range) as usize; + let this_size = key_range_size(range) as usize; - if current_part_size + this_size > target_nblocks && - !current_part.is_empty() - { + if current_part_size + this_size > target_nblocks && !current_part.is_empty() { self.partitions.push(current_part); current_part = Vec::new(); current_part_size = 0; @@ -87,3 +84,9 @@ impl KeyPartitioning { } } } + +impl Default for KeyPartitioning { + fn default() -> Self { + Self::new() + } +} diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index a319563c33..49298ff853 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use lazy_static::lazy_static; use tracing::*; -use std::cmp::{min, max, Ordering}; +use std::cmp::{max, min, Ordering}; use std::collections::hash_map::Entry; use std::collections::BTreeSet; use std::collections::HashMap; @@ -1006,7 +1006,12 @@ impl LayeredTimeline { /// /// This function takes the current timeline's locked LayerMap as an argument, /// so callers can avoid potential race conditions. - fn get_reconstruct_data(&self, key: Key, request_lsn: Lsn, reconstruct_state: &mut ValueReconstructState) -> Result<()> { + fn get_reconstruct_data( + &self, + key: Key, + request_lsn: Lsn, + reconstruct_state: &mut ValueReconstructState, + ) -> Result<()> { // Start from the current timeline. let mut timeline_owned; let mut timeline = self; @@ -1031,7 +1036,12 @@ impl LayeredTimeline { // Didn't make any progress in last iteration. Error out to avoid // getting stuck in the loop. for (r, c, l) in path { - error!("PATH: result {:?}, cont_lsn {}, layer: {}", r, c, l.filename().display()); + error!( + "PATH: result {:?}, cont_lsn {}, layer: {}", + r, + c, + l.filename().display() + ); } bail!("could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}", key, @@ -1053,7 +1063,11 @@ impl LayeredTimeline { // Recurse into ancestor if needed if Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn { - info!("going into ancestor {}, cont_lsn is {}", timeline.ancestor_lsn, cont_lsn); + info!( + "going into ancestor {}, cont_lsn is {}", + timeline.ancestor_lsn, + cont_lsn + ); let ancestor = timeline.get_ancestor_timeline()?; timeline_owned = ancestor; timeline = &*timeline_owned; @@ -1068,7 +1082,11 @@ impl LayeredTimeline { let start_lsn = open_layer.get_lsn_range().start; if cont_lsn > start_lsn { //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display()); - result = open_layer.get_value_reconstruct_data(key, open_layer.get_lsn_range().start..cont_lsn, reconstruct_state)?; + result = open_layer.get_value_reconstruct_data( + key, + open_layer.get_lsn_range().start..cont_lsn, + reconstruct_state, + )?; cont_lsn = start_lsn; path.push((result, cont_lsn, open_layer.clone())); continue; @@ -1078,19 +1096,25 @@ impl LayeredTimeline { let start_lsn = frozen_layer.get_lsn_range().start; if cont_lsn > start_lsn { //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display()); - result = frozen_layer.get_value_reconstruct_data(key, frozen_layer.get_lsn_range().start..cont_lsn, reconstruct_state)?; + result = frozen_layer.get_value_reconstruct_data( + key, + frozen_layer.get_lsn_range().start..cont_lsn, + reconstruct_state, + )?; cont_lsn = start_lsn; path.push((result, cont_lsn, frozen_layer.clone())); continue; } } - if let Some(SearchResult { lsn_floor, layer }) = layers - .search(key, cont_lsn)? - { + if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn)? { //info!("CHECKING for {} at {} on historic layer {}", key, cont_lsn, layer.filename().display()); - result = layer.get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state)?; + result = layer.get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + )?; cont_lsn = lsn_floor; path.push((result, cont_lsn, layer)); } else if self.ancestor_timeline.is_some() { @@ -1427,11 +1451,16 @@ impl LayeredTimeline { } // Is it time to create a new image layer for the given partition? - fn time_for_new_image_layer(&self, partition: &Vec>, lsn: Lsn, threshold: usize) -> Result { + fn time_for_new_image_layer( + &self, + partition: &[Range], + lsn: Lsn, + threshold: usize, + ) -> Result { let layers = self.layers.lock().unwrap(); for part_range in partition { - let image_coverage = layers.image_coverage(&part_range, lsn)?; + let image_coverage = layers.image_coverage(part_range, lsn)?; for (img_range, last_img) in image_coverage { let img_lsn = if let Some(ref last_img) = last_img { last_img.get_lsn_range().end @@ -1454,7 +1483,7 @@ impl LayeredTimeline { Ok(false) } - fn create_image_layer(&self, partition: &Vec>, lsn: Lsn) -> Result<()> { + fn create_image_layer(&self, partition: &[Range], lsn: Lsn) -> Result<()> { let img_range = partition.first().unwrap().start..partition.last().unwrap().end; let mut image_layer_writer = ImageLayerWriter::new(self.conf, self.timelineid, self.tenantid, &img_range, lsn)?; @@ -1492,18 +1521,18 @@ impl LayeredTimeline { // FIXME: this function probably won't work correctly if there's overlap // in the deltas. - let lsn_range = level0_deltas.iter().map(|l| l.get_lsn_range()).reduce(|a, b| { - min(a.start, b.start)..max(a.end, b.end) - }).unwrap(); + let lsn_range = level0_deltas + .iter() + .map(|l| l.get_lsn_range()) + .reduce(|a, b| min(a.start, b.start)..max(a.end, b.end)) + .unwrap(); let all_values_iter = level0_deltas.iter().map(|l| l.iter()).kmerge_by(|a, b| { if let Ok((a_key, a_lsn, _)) = a { if let Ok((b_key, b_lsn, _)) = b { match a_key.cmp(b_key) { Ordering::Less => true, - Ordering::Equal => { - a_lsn <= b_lsn - } + Ordering::Equal => a_lsn <= b_lsn, Ordering::Greater => false, } } else { @@ -1678,9 +1707,15 @@ impl LayeredTimeline { // OK for a delta layer to have end LSN 101, but if the end LSN // is 102, then it might not have been fully flushed to disk // before crash. - if !layers.newer_image_layer_exists(&l.get_key_range(), l.get_lsn_range().end, disk_consistent_lsn+1)? - { - info!("keeping {} because it is the latest layer", l.filename().display()); + if !layers.newer_image_layer_exists( + &l.get_key_range(), + l.get_lsn_range().end, + disk_consistent_lsn + 1, + )? { + info!( + "keeping {} because it is the latest layer", + l.filename().display() + ); result.layers_not_updated += 1; continue 'outer; } @@ -1691,7 +1726,7 @@ impl LayeredTimeline { l.filename().display(), l.is_incremental(), ); - layers_to_remove.push(Arc::clone(&l)); + layers_to_remove.push(Arc::clone(l)); } // Actually delete the layers from disk and remove them from the map. @@ -2046,9 +2081,12 @@ mod tests { updated[blknum] = lsn; } - for blknum in 0..NUM_KEYS { + for (blknum, last_lsn) in updated.iter().enumerate() { test_key.field6 = blknum as u32; - assert_eq!(tline.get(test_key, lsn)?, TEST_IMG(&format!("{} at {}", blknum, updated[blknum]))); + assert_eq!( + tline.get(test_key, lsn)?, + TEST_IMG(&format!("{} at {}", blknum, last_lsn)) + ); } println!("checkpointing {}", lsn); diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 3c5acbc53a..d8ecf7d75e 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -237,7 +237,7 @@ impl Layer for DeltaLayer { match DeltaValueIter::new(inner) { Ok(iter) => Box::new(iter), - Err(err) => Box::new(std::iter::once(Err(err))) + Err(err) => Box::new(std::iter::once(Err(err))), } } @@ -507,12 +507,12 @@ impl DeltaLayerWriter { // Note: This overwrites any existing file. There shouldn't be any. // FIXME: throw an error instead? - let path = conf - .timeline_path(&timelineid, &tenantid) - .join(format!("{}-XXX__{:016X}-{:016X}.temp", - key_start, - u64::from(lsn_range.start), - u64::from(lsn_range.end))); + let path = conf.timeline_path(&timelineid, &tenantid).join(format!( + "{}-XXX__{:016X}-{:016X}.temp", + key_start, + u64::from(lsn_range.start), + u64::from(lsn_range.end) + )); info!("temp deltalayer path {}", path.display()); let file = VirtualFile::create(&path)?; let buf_writer = BufWriter::new(file); @@ -632,7 +632,7 @@ impl DeltaLayerWriter { pub fn abort(self) { match self.values_writer.close() { - Ok(book) => { + Ok(book) => { if let Err(err) = book.close() { error!("error while closing delta layer file: {}", err); } @@ -650,7 +650,7 @@ impl DeltaLayerWriter { struct DeltaValueIter<'a> { all_offsets: Vec<(Key, Lsn, u64)>, next_idx: usize, - + inner: RwLockReadGuard<'a, DeltaLayerInner>, } @@ -671,7 +671,6 @@ impl<'a> Iterator for DeltaValueIter<'a> { /// impl<'a> DeltaValueIter<'a> { fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result { - let mut index: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); index.sort_by_key(|x| x.0); @@ -693,12 +692,13 @@ impl<'a> DeltaValueIter<'a> { if self.next_idx < self.all_offsets.len() { let (key, lsn, off) = self.all_offsets[self.next_idx]; - let values_reader = self.inner + let values_reader = self + .inner .book .as_ref() .expect("should be loaded in load call above") .chapter_reader(VALUES_CHAPTER)?; - + let val = Value::des(&utils::read_blob_from_chapter(&values_reader, off)?)?; self.next_idx += 1; @@ -708,4 +708,3 @@ impl<'a> DeltaValueIter<'a> { } } } - diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index a8772632ab..e0cb2d8d02 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -184,7 +184,7 @@ impl Layer for ImageLayer { fn iter(&self) -> Box>> { todo!(); } - + fn unload(&self) -> Result<()> { // TODO: unload 'segs'. Or even better, don't hold it in memory but // access it directly from the file (using the buffer cache) diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index 4d2af960a6..27a3eb279a 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -10,8 +10,8 @@ //! corresponding files are written to disk. //! -use crate::layered_repository::storage_layer::{range_eq, range_overlaps}; use crate::layered_repository::storage_layer::Layer; +use crate::layered_repository::storage_layer::{range_eq, range_overlaps}; use crate::layered_repository::InMemoryLayer; use crate::repository::Key; use anyhow::Result; @@ -143,13 +143,18 @@ impl LayerMap { ); let lsn_floor = std::cmp::max( Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1), - l.get_lsn_range().start); + l.get_lsn_range().start, + ); Ok(Some(SearchResult { lsn_floor, layer: l, })) } else if let Some(l) = latest_img { - trace!("found img layer and no deltas for request on {} at {}", key, end_lsn); + trace!( + "found img layer and no deltas for request on {} at {}", + key, + end_lsn + ); Ok(Some(SearchResult { lsn_floor: latest_img_lsn.unwrap(), layer: l, @@ -202,7 +207,6 @@ impl LayerMap { lsn: Lsn, disk_consistent_lsn: Lsn, ) -> Result { - let mut range_remain = key_range.clone(); loop { @@ -212,10 +216,10 @@ impl LayerMap { continue; } let img_lsn = l.get_lsn_range().start; - if !l.is_incremental() && - l.get_key_range().contains(&range_remain.start) && - img_lsn > lsn && - img_lsn < disk_consistent_lsn + if !l.is_incremental() + && l.get_key_range().contains(&range_remain.start) + && img_lsn > lsn + && img_lsn < disk_consistent_lsn { made_progress = true; let img_key_end = l.get_key_range().end; @@ -329,11 +333,7 @@ impl LayerMap { Ok(ranges) } - pub fn count_deltas( - &self, - key_range: &Range, - lsn_range: &Range, - ) -> Result { + pub fn count_deltas(&self, key_range: &Range, lsn_range: &Range) -> Result { let mut result = 0; for l in self.historic_layers.iter() { if !l.is_incremental() { @@ -348,8 +348,8 @@ impl LayerMap { // We ignore level0 delta layers. Unless the whole keyspace fits // into one partition - if !range_eq(key_range, &(Key::MIN..Key::MAX)) && - range_eq(&l.get_key_range(), &(Key::MIN..Key::MAX)) + if !range_eq(key_range, &(Key::MIN..Key::MAX)) + && range_eq(&l.get_key_range(), &(Key::MIN..Key::MAX)) { continue; } diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index 76795e1bb0..d508322a81 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -130,7 +130,7 @@ pub trait Layer: Send + Sync { fn is_in_memory(&self) -> bool; fn iter(&self) -> Box> + '_>; - + /// Return a set of all distinct Keys present in this layer fn collect_keys(&self, key_range: &Range, keys: &mut HashSet) -> Result<()>; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index fd586495dc..50963a44cd 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -21,8 +21,8 @@ use std::ops::Range; use std::sync::{Arc, RwLockReadGuard}; use tracing::{debug, info, warn}; use zenith_utils::bin_ser::BeSer; -use zenith_utils::lsn::{Lsn, RecordLsn}; use zenith_utils::lsn::AtomicLsn; +use zenith_utils::lsn::{Lsn, RecordLsn}; /// Block number within a relation or SRU. This matches PostgreSQL's BlockNumber type. pub type BlockNumber = u32; @@ -311,11 +311,15 @@ impl DatadirTimeline { let dbdir = DbDirectory::des(&buf)?; let mut dbs: Vec<(Oid, Oid)> = dbdir.dbs.iter().cloned().collect(); - dbs.sort(); + dbs.sort_unstable(); for (spcnode, dbnode) in dbs { result.add_key(relmap_file_key(spcnode, dbnode)); - let mut rels: Vec = self.list_rels(spcnode, dbnode, lsn)?.iter().cloned().collect(); - rels.sort(); + let mut rels: Vec = self + .list_rels(spcnode, dbnode, lsn)? + .iter() + .cloned() + .collect(); + rels.sort_unstable(); for rel in rels { let relsize_key = rel_size_to_key(rel); let mut buf = self.tline.get(relsize_key, lsn)?; @@ -327,18 +331,24 @@ impl DatadirTimeline { } // Iterate SLRUs next - for kind in [SlruKind::Clog, SlruKind:: MultiXactMembers, SlruKind::MultiXactOffsets] { + for kind in [ + SlruKind::Clog, + SlruKind::MultiXactMembers, + SlruKind::MultiXactOffsets, + ] { let slrudir_key = slru_dir_to_key(kind); let buf = self.tline.get(slrudir_key, lsn)?; let dir = SlruSegmentDirectory::des(&buf)?; let mut segments: Vec = dir.segments.iter().cloned().collect(); - segments.sort(); + segments.sort_unstable(); for segno in segments { let segsize_key = slru_segment_size_to_key(kind, segno); let mut buf = self.tline.get(segsize_key, lsn)?; let segsize = buf.get_u32_le(); - result.add_range(slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize)); + result.add_range( + slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize), + ); result.add_key(segsize_key); } } @@ -347,7 +357,7 @@ impl DatadirTimeline { let buf = self.tline.get(TWOPHASEDIR_KEY, lsn)?; let twophase_dir = TwoPhaseDirectory::des(&buf)?; let mut xids: Vec = twophase_dir.xids.iter().cloned().collect(); - xids.sort(); + xids.sort_unstable(); for xid in xids { result.add_key(twophase_file_key(xid)); } diff --git a/pageserver/src/relish.rs b/pageserver/src/relish.rs index f775ce933a..521e07e50f 100644 --- a/pageserver/src/relish.rs +++ b/pageserver/src/relish.rs @@ -86,7 +86,6 @@ impl Ord for RelTag { } } - /// Display RelTag in the same format that's used in most PostgreSQL debug messages: /// /// //[_fsm|_vm|_init] diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index d8f68aaa6c..c92dff661a 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -26,13 +26,12 @@ pub struct Key { } impl Key { - pub fn next(&self) -> Key { self.add(1) } - + pub fn add(&self, x: u32) -> Key { - let mut key = self.clone(); + let mut key = *self; let r = key.field6.overflowing_add(x); key.field6 = r.0; @@ -72,16 +71,14 @@ impl Key { } } - - pub fn key_range_size(key_range: &Range) -> u32 { let start = key_range.start; let end = key_range.end; - if end.field1 != start.field1 || - end.field2 != start.field2 || - end.field3 != start.field3 || - end.field4 != start.field4 + if end.field1 != start.field1 + || end.field2 != start.field2 + || end.field3 != start.field3 + || end.field4 != start.field4 { return u32::MAX; } @@ -630,20 +627,36 @@ mod tests { { let writer = tline.writer(); // Create a relation on the timeline - writer.put(*TEST_KEY, lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))))?; + writer.put( + *TEST_KEY, + lsn, + Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + )?; writer.advance_last_record_lsn(lsn); lsn += 0x10; - writer.put(*TEST_KEY, lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))))?; + writer.put( + *TEST_KEY, + lsn, + Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + )?; writer.advance_last_record_lsn(lsn); lsn += 0x10; } tline.checkpoint(CheckpointConfig::Forced)?; { let writer = tline.writer(); - writer.put(*TEST_KEY, lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))))?; + writer.put( + *TEST_KEY, + lsn, + Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + )?; writer.advance_last_record_lsn(lsn); lsn += 0x10; - writer.put(*TEST_KEY, lsn, Value::Image(TEST_IMG(&format!("foo at {}", lsn))))?; + writer.put( + *TEST_KEY, + lsn, + Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + )?; writer.advance_last_record_lsn(lsn); } tline.checkpoint(CheckpointConfig::Forced) @@ -668,10 +681,10 @@ mod tests { Err(err) => { assert!(err.to_string().contains("invalid branch start lsn")); assert!(err - .source() - .unwrap() - .to_string() - .contains("we might've already garbage collected needed data")) + .source() + .unwrap() + .to_string() + .contains("we might've already garbage collected needed data")) } } @@ -689,10 +702,10 @@ mod tests { Err(err) => { assert!(&err.to_string().contains("invalid branch start lsn")); assert!(&err - .source() - .unwrap() - .to_string() - .contains("is earlier than latest GC horizon")); + .source() + .unwrap() + .to_string() + .contains("is earlier than latest GC horizon")); } } diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 92133a3fd7..0051bf5361 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -996,9 +996,7 @@ mod tests { static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]); - fn init_walingest_test<'a, R: Repository>( - tline: &'a DatadirTimeline, - ) -> Result> { + fn init_walingest_test(tline: &DatadirTimeline) -> Result> { let mut writer = tline.begin_record(Lsn(0x10)); writer.put_checkpoint(ZERO_CHECKPOINT.clone())?; writer.finish()?;