diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 1a9aa78d8c..f8a41e5b2b 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -57,6 +57,7 @@ pub fn import_timeline_from_postgres_datadir( if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? { pg_control = Some(control_file); } + modification.flush()?; } } @@ -317,6 +318,7 @@ pub fn import_basebackup_from_tar( // We found the pg_control file. pg_control = Some(res); } + modification.flush()?; } tar::EntryType::Directory => { debug!("directory {:?}", file_path); diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 13f200ec92..4cbd769661 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -33,7 +33,7 @@ use std::time::{Duration, Instant, SystemTime}; use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}; use crate::config::PageServerConf; -use crate::keyspace::KeySpace; +use crate::keyspace::{KeyPartitioning, KeySpace}; use crate::storage_sync::index::RemoteIndex; use crate::tenant_config::{TenantConf, TenantConfOpt}; @@ -1690,8 +1690,24 @@ impl LayeredTimeline { /// Flush one frozen in-memory layer to disk, as a new delta layer. fn flush_frozen_layer(&self, frozen_layer: Arc) -> Result<()> { - let new_delta = frozen_layer.write_to_disk()?; - let new_delta_path = new_delta.path(); + let layer_paths_to_upload; + + // As a special case, when we have just imported an image into the repository, + // instead of writing out a L0 delta layer, we directly write out image layer + // files instead. This is possible as long as *all* the data imported into the + // repository have the same LSN. + let lsn_range = frozen_layer.get_lsn_range(); + if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) { + let pgdir = tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)?; + let (partitioning, _lsn) = + pgdir.repartition(self.initdb_lsn, self.get_compaction_target_size())?; + layer_paths_to_upload = + self.create_image_layers(&partitioning, self.initdb_lsn, true)?; + } else { + // normal case, write out a L0 delta layer file. + let delta_path = self.create_delta_layer(&frozen_layer)?; + layer_paths_to_upload = HashSet::from([delta_path]); + } // Sync the new layer to disk. // @@ -1709,7 +1725,8 @@ impl LayeredTimeline { fail_point!("flush-frozen"); - // Finally, replace the frozen in-memory layer with the new on-disk layer + // The new on-disk layers are now in the layer map. We can remove the + // in-memory layer from the map now. { let mut layers = self.layers.write().unwrap(); let l = layers.frozen_layers.pop_front(); @@ -1719,19 +1736,27 @@ impl LayeredTimeline { // layer to disk at the same time, that would not work. assert!(Arc::ptr_eq(&l.unwrap(), &frozen_layer)); - // Add the new delta layer to the LayerMap - layers.insert_historic(Arc::new(new_delta)); - // release lock on 'layers' } + fail_point!("checkpoint-after-sync"); + // Update the metadata file, with new 'disk_consistent_lsn' // // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing // *all* the layers, to avoid fsyncing the file multiple times. - let disk_consistent_lsn = Lsn(frozen_layer.get_lsn_range().end.0 - 1); - fail_point!("checkpoint-after-sync"); + let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1); + self.update_disk_consistent_lsn(disk_consistent_lsn, layer_paths_to_upload)?; + Ok(()) + } + + /// Update metadata file + fn update_disk_consistent_lsn( + &self, + disk_consistent_lsn: Lsn, + layer_paths_to_upload: HashSet, + ) -> Result<()> { // If we were able to advance 'disk_consistent_lsn', save it the metadata file. // After crash, we will restart WAL streaming and processing from that point. let old_disk_consistent_lsn = self.disk_consistent_lsn.load(); @@ -1781,14 +1806,11 @@ impl LayeredTimeline { false, )?; - NUM_PERSISTENT_FILES_CREATED.inc_by(1); - PERSISTENT_BYTES_WRITTEN.inc_by(new_delta_path.metadata()?.len()); - if self.upload_layers.load(atomic::Ordering::Relaxed) { storage_sync::schedule_layer_upload( self.tenant_id, self.timeline_id, - HashSet::from([new_delta_path]), + layer_paths_to_upload, Some(metadata), ); } @@ -1800,6 +1822,37 @@ impl LayeredTimeline { Ok(()) } + // Write out the given frozen in-memory layer as a new L0 delta file + fn create_delta_layer(&self, frozen_layer: &InMemoryLayer) -> Result { + // Write it out + let new_delta = frozen_layer.write_to_disk()?; + let new_delta_path = new_delta.path(); + + // Sync it to disk. + // + // We must also fsync the timeline dir to ensure the directory entries for + // new layer files are durable + // + // TODO: If we're running inside 'flush_frozen_layers' and there are multiple + // files to flush, it might be better to first write them all, and then fsync + // them all in parallel. + par_fsync::par_fsync(&[ + new_delta_path.clone(), + self.conf.timeline_path(&self.timeline_id, &self.tenant_id), + ])?; + + // Add it to the layer map + { + let mut layers = self.layers.write().unwrap(); + layers.insert_historic(Arc::new(new_delta)); + } + + NUM_PERSISTENT_FILES_CREATED.inc_by(1); + PERSISTENT_BYTES_WRITTEN.inc_by(new_delta_path.metadata()?.len()); + + Ok(new_delta_path) + } + pub fn compact(&self) -> Result<()> { // // High level strategy for compaction / image creation: @@ -1843,29 +1896,23 @@ impl LayeredTimeline { if let Ok(pgdir) = tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id) { + // 2. Create new image layers for partitions that have been modified + // "enough". let (partitioning, lsn) = pgdir.repartition( self.get_last_record_lsn(), self.get_compaction_target_size(), )?; - let timer = self.create_images_time_histo.start_timer(); - // 2. Create new image layers for partitions that have been modified - // "enough". - let mut layer_paths_to_upload = HashSet::with_capacity(partitioning.parts.len()); - for part in partitioning.parts.iter() { - if self.time_for_new_image_layer(part, lsn)? { - let new_path = self.create_image_layer(part, lsn)?; - layer_paths_to_upload.insert(new_path); - } - } - if self.upload_layers.load(atomic::Ordering::Relaxed) { + let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?; + if !layer_paths_to_upload.is_empty() + && self.upload_layers.load(atomic::Ordering::Relaxed) + { storage_sync::schedule_layer_upload( self.tenant_id, self.timeline_id, - layer_paths_to_upload, + HashSet::from_iter(layer_paths_to_upload), None, ); } - timer.stop_and_record(); // 3. Compact let timer = self.compact_time_histo.start_timer(); @@ -1906,21 +1953,40 @@ impl LayeredTimeline { Ok(false) } - fn create_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { - let img_range = - partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end; - let mut image_layer_writer = - ImageLayerWriter::new(self.conf, self.timeline_id, self.tenant_id, &img_range, lsn)?; + fn create_image_layers( + &self, + partitioning: &KeyPartitioning, + lsn: Lsn, + force: bool, + ) -> Result> { + let timer = self.create_images_time_histo.start_timer(); + let mut image_layers: Vec = Vec::new(); + let mut layer_paths_to_upload = HashSet::new(); + for partition in partitioning.parts.iter() { + if force || self.time_for_new_image_layer(partition, lsn)? { + let img_range = + partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end; + let mut image_layer_writer = ImageLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_id, + &img_range, + lsn, + )?; - for range in &partition.ranges { - let mut key = range.start; - while key < range.end { - let img = self.get(key, lsn)?; - image_layer_writer.put_image(key, &img)?; - key = key.next(); + for range in &partition.ranges { + let mut key = range.start; + while key < range.end { + let img = self.get(key, lsn)?; + image_layer_writer.put_image(key, &img)?; + key = key.next(); + } + } + let image_layer = image_layer_writer.finish()?; + layer_paths_to_upload.insert(image_layer.path()); + image_layers.push(image_layer); } } - let image_layer = image_layer_writer.finish()?; // Sync the new layer to disk before adding it to the layer map, to make sure // we don't garbage collect something based on the new layer, before it has @@ -1931,19 +1997,18 @@ impl LayeredTimeline { // // Compaction creates multiple image layers. It would be better to create them all // and fsync them all in parallel. - par_fsync::par_fsync(&[ - image_layer.path(), - self.conf.timeline_path(&self.timeline_id, &self.tenant_id), - ])?; - - // FIXME: Do we need to do something to upload it to remote storage here? + let mut all_paths = Vec::from_iter(layer_paths_to_upload.clone()); + all_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id)); + par_fsync::par_fsync(&all_paths)?; let mut layers = self.layers.write().unwrap(); - let new_path = image_layer.path(); - layers.insert_historic(Arc::new(image_layer)); + for l in image_layers { + layers.insert_historic(Arc::new(l)); + } drop(layers); + timer.stop_and_record(); - Ok(new_path) + Ok(layer_paths_to_upload) } /// diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index c47d7cb1ad..2cf0c343b9 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -884,6 +884,57 @@ impl<'a, R: Repository> DatadirModification<'a, R> { Ok(()) } + /// + /// Flush changes accumulated so far to the underlying repository. + /// + /// Usually, changes made in DatadirModification are atomic, but this allows + /// you to flush them to the underlying repository before the final `commit`. + /// That allows to free up the memory used to hold the pending changes. + /// + /// Currently only used during bulk import of a data directory. In that + /// context, breaking the atomicity is OK. If the import is interrupted, the + /// whole import fails and the timeline will be deleted anyway. + /// (Or to be precise, it will be left behind for debugging purposes and + /// ignored, see https://github.com/neondatabase/neon/pull/1809) + /// + /// Note: A consequence of flushing the pending operations is that they + /// won't be visible to subsequent operations until `commit`. The function + /// retains all the metadata, but data pages are flushed. That's again OK + /// for bulk import, where you are just loading data pages and won't try to + /// modify the same pages twice. + pub fn flush(&mut self) -> Result<()> { + // Unless we have accumulated a decent amount of changes, it's not worth it + // to scan through the pending_updates list. + let pending_nblocks = self.pending_nblocks; + if pending_nblocks < 10000 { + return Ok(()); + } + + let writer = self.tline.tline.writer(); + + // Flush relation and SLRU data blocks, keep metadata. + let mut result: Result<()> = Ok(()); + self.pending_updates.retain(|&key, value| { + if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) { + result = writer.put(key, self.lsn, value); + false + } else { + true + } + }); + result?; + + if pending_nblocks != 0 { + self.tline.current_logical_size.fetch_add( + pending_nblocks * pg_constants::BLCKSZ as isize, + Ordering::SeqCst, + ); + self.pending_nblocks = 0; + } + + Ok(()) + } + /// /// Finish this atomic update, writing all the updated keys to the /// underlying timeline. @@ -1299,6 +1350,10 @@ pub fn key_to_rel_block(key: Key) -> Result<(RelTag, BlockNumber)> { }) } +fn is_rel_block_key(key: Key) -> bool { + key.field1 == 0x00 && key.field4 != 0 +} + pub fn key_to_slru_block(key: Key) -> Result<(SlruKind, u32, BlockNumber)> { Ok(match key.field1 { 0x01 => { @@ -1317,6 +1372,12 @@ pub fn key_to_slru_block(key: Key) -> Result<(SlruKind, u32, BlockNumber)> { }) } +fn is_slru_block_key(key: Key) -> bool { + key.field1 == 0x01 // SLRU-related + && key.field3 == 0x00000001 // but not SlruDir + && key.field6 != 0xffffffff // and not SlruSegSize +} + // //-- Tests that should work the same with any Repository/Timeline implementation. //