diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index b834daae10..cf1abf741f 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -381,14 +381,15 @@ impl DeltaLayer { } } - /// Create a new delta file, using the given btreemaps containing the page versions and - /// relsizes. + /// Create a new delta file, using the given page versions and relsizes. + /// The page versions are passed by an iterator; the iterator must return + /// page versions in blknum+lsn order. /// /// This is used to write the in-memory layer to disk. The in-memory layer uses the same /// data structure with two btreemaps as we do, so passing the btreemaps is currently /// expedient. #[allow(clippy::too_many_arguments)] - pub fn create( + pub fn create<'a>( conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId, @@ -397,7 +398,7 @@ impl DeltaLayer { end_lsn: Lsn, dropped: bool, predecessor: Option>, - page_versions: BTreeMap<(u32, Lsn), PageVersion>, + page_versions: impl Iterator, relsizes: BTreeMap, ) -> Result { let delta_layer = DeltaLayer { @@ -433,19 +434,21 @@ impl DeltaLayer { for (key, page_version) in page_versions { let page_image_range = page_version .page_image - .map(|page_image| page_version_writer.write_blob(page_image.as_ref())) + .as_ref() + .map(|page_image| page_version_writer.write_blob(page_image)) .transpose()?; let record_range = page_version .record + .as_ref() .map(|record| { - let buf = WALRecord::ser(&record)?; + let buf = WALRecord::ser(record)?; page_version_writer.write_blob(&buf) }) .transpose()?; let old = inner.page_version_metas.insert( - key, + *key, PageVersionMeta { page_image_range, record_range, diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 2499c9619b..c0fbad7969 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -667,6 +667,15 @@ impl InMemoryLayer { self.get_end_lsn() ); + // Grab the lock in read-mode. We hold it over the I/O, but because this + // layer is not writeable anymore, no one should be trying to aquire the + // write lock on it, so we shouldn't block anyone. There's one exception + // though: another thread might have grabbed a reference to this layer + // in `get_layer_for_write' just before the checkpointer called + // `freeze`, and then `write_to_disk` on it. When the thread gets the + // lock, it will see that it's not writeable anymore and retry, but it + // would have to wait until we release it. That race condition is very + // rare though, so we just accept the potential latency hit for now. let inner = self.inner.read().unwrap(); assert!(!inner.writeable); @@ -682,7 +691,7 @@ impl InMemoryLayer { drop_lsn, true, predecessor, - inner.page_versions.clone(), + inner.page_versions.iter(), inner.segsizes.clone(), )?; trace!( @@ -702,15 +711,11 @@ impl InMemoryLayer { before_segsizes.insert(*lsn, *size); } } + let mut before_page_versions = inner.page_versions.iter().filter(|tup| { + let ((_blknum, lsn), _pv) = tup; - let mut before_page_versions = BTreeMap::new(); - for ((blknum, lsn), pv) in inner.page_versions.iter() { - if *lsn < end_lsn { - before_page_versions.insert((*blknum, *lsn), pv.clone()); - } - } - - drop(inner); + *lsn < end_lsn + }); let mut frozen_layers: Vec> = Vec::new(); @@ -736,9 +741,11 @@ impl InMemoryLayer { end_lsn ); } else { - assert!(before_page_versions.is_empty()); + assert!(before_page_versions.next().is_none()); } + drop(inner); + // Write a new base image layer at the cutoff point let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?; frozen_layers.push(Arc::new(image_layer));