diff --git a/pageserver/src/pg_import.rs b/pageserver/src/pg_import.rs index 4d6ef9a527..d4c69877c6 100644 --- a/pageserver/src/pg_import.rs +++ b/pageserver/src/pg_import.rs @@ -28,6 +28,7 @@ use pageserver_api::shard::ShardIndex; use pageserver_api::key::Key; use pageserver_api::reltag::SlruKind; use pageserver_api::key::{slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, TWOPHASEDIR_KEY, CONTROLFILE_KEY, CHECKPOINT_KEY}; +use pageserver_api::key::rel_key_range; use utils::bin_ser::BeSer; use std::collections::HashSet; @@ -39,6 +40,8 @@ pub struct PgImportEnv { tsi: TenantShardId, pgdata_lsn: Lsn, + + layers: Vec, } impl PgImportEnv { @@ -65,6 +68,8 @@ impl PgImportEnv { tli: timeline_id, tsi, pgdata_lsn: Lsn(0), // Will be filled in later, when the control file is imported + + layers: Vec::new(), }) } @@ -83,102 +88,142 @@ impl PgImportEnv { let datadir = PgDataDir::new(pgdata_path); - let range = Key::MIN..Key::NON_L0_MAX; - let mut one_big_layer = ImageLayerWriter::new( + // Import dbdir (00:00:00 keyspace) + // This is just constructed here, but will be written to the image layer in the first call to import_db() + let mut dbdir_buf = Some(Bytes::from(DbDirectory::ser(&DbDirectory { + dbdirs: datadir.dbs.iter().map(|db| ((db.spcnode, db.dboid), true)).collect(), + })?)); + // Import databases (00:spcnode:dbnode keyspace for each db) + let mut start_key = Key::MIN; + for db in datadir.dbs { + start_key = self.import_db(start_key, &db, dbdir_buf.take()).await?; + } + + let mut tail_layer = ImageLayerWriter::new( &self.conf, self.tli, self.tsi, - &range, + &(start_key..Key::NON_L0_MAX), pgdata_lsn, &self.ctx, ).await?; - // Import dbdir (00:00:00 keyspace) - self.import_dbdir(&mut one_big_layer, &datadir).await?; - - // Import databases (00:spcnode:dbnode keyspace for each db) - for db in datadir.dbs { - self.import_db(&mut one_big_layer, &db).await?; - } - // Import SLRUs // pg_xact (01:00 keyspace) - self.import_slru(&mut one_big_layer, SlruKind::Clog, &pgdata_path.join("pg_xact")).await?; + self.import_slru(&mut tail_layer, SlruKind::Clog, &pgdata_path.join("pg_xact")).await?; // pg_multixact/members (01:01 keyspace) - self.import_slru(&mut one_big_layer, SlruKind::MultiXactMembers, &pgdata_path.join("pg_multixact/members")).await?; + self.import_slru(&mut tail_layer, SlruKind::MultiXactMembers, &pgdata_path.join("pg_multixact/members")).await?; // pg_multixact/offsets (01:02 keyspace) - self.import_slru(&mut one_big_layer, SlruKind::MultiXactOffsets, &pgdata_path.join("pg_multixact/offsets")).await?; + self.import_slru(&mut tail_layer, SlruKind::MultiXactOffsets, &pgdata_path.join("pg_multixact/offsets")).await?; // Import pg_twophase. // TODO: as empty let twophasedir_buf = TwoPhaseDirectory::ser( &TwoPhaseDirectory { xids: HashSet::new() } )?; - one_big_layer.put_image(TWOPHASEDIR_KEY, Bytes::from(twophasedir_buf), &self.ctx).await?; + tail_layer.put_image(TWOPHASEDIR_KEY, Bytes::from(twophasedir_buf), &self.ctx).await?; // Controlfile, checkpoint - one_big_layer.put_image(CONTROLFILE_KEY, Bytes::from(controlfile_buf), &self.ctx).await?; + tail_layer.put_image(CONTROLFILE_KEY, Bytes::from(controlfile_buf), &self.ctx).await?; let checkpoint_buf = control_file.checkPointCopy.encode()?; - one_big_layer.put_image(CHECKPOINT_KEY, checkpoint_buf, &self.ctx).await?; + tail_layer.put_image(CHECKPOINT_KEY, checkpoint_buf, &self.ctx).await?; - let layerdesc = one_big_layer.finish_raw(&self.ctx).await?; + let layerdesc = tail_layer.finish_raw(&self.ctx).await?; + self.layers.push(layerdesc); // should we anything about the wal? // Create index_part.json file - self.create_index_part(&[layerdesc], &control_file).await?; + self.create_index_part(&control_file).await?; Ok(()) } - // DbDir: (spcnode, dbnode) -> bool (do relmapper and PG_VERSION files exist) - async fn import_dbdir( - &mut self, - layer_writer: &mut ImageLayerWriter, - datadir: &PgDataDir, - ) -> anyhow::Result<()> { - debug!("Constructing dbdir entry, key {DBDIR_KEY}"); - let dbdir_buf = DbDirectory::ser(&DbDirectory { - dbdirs: datadir.dbs.iter().map(|db| ((db.spcnode, db.dboid), true)).collect(), - })?; - layer_writer.put_image(DBDIR_KEY, dbdir_buf.into(), &self.ctx).await?; - Ok(()) - } - async fn import_db( &mut self, - layer_writer: &mut ImageLayerWriter, + start_key: Key, db: &PgDataDirDb, - ) -> anyhow::Result<()> { + mut dbdir_buf: Option, + ) -> anyhow::Result { debug!( "Importing database (path={}, tablespace={}, dboid={})", db.path, db.spcnode, db.dboid ); - // Import relmap (00:spcnode:dbnode:00:*:00) - let relmap_key = relmap_file_key(db.spcnode, db.dboid); - debug!("Constructing relmap entry, key {relmap_key}"); - let mut relmap_file = tokio::fs::File::open(&db.path.join("pg_filenode.map")).await?; - let relmap_buf = read_all_bytes(&mut relmap_file).await?; - layer_writer.put_image(relmap_key, relmap_buf, &self.ctx).await?; - - // Import reldir (00:spcnode:dbnode:00:*:01) - let reldir_key = rel_dir_to_key(db.spcnode, db.dboid); - debug!("Constructing reldirs entry, key {reldir_key}"); - let reldir_buf = RelDirectory::ser(&RelDirectory { - rels: db.files.iter().map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)).collect(), - })?; - layer_writer.put_image(reldir_key, reldir_buf.into(), &self.ctx).await?; - // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff) - for file in &db.files { - self.import_rel_file(layer_writer, &file).await?; - }; - Ok(()) + let mut chunks: Vec> = Vec::new(); + let mut accumulated_bytes: usize = 0; + let mut current_chunk: Vec<&PgDataDirDbFile> = Vec::new(); + + for file in &db.files { + let real_size = if let Some(nblocks) = file.nblocks { + nblocks * 8192 + } else { + 1024*1024*1024 + }; + if accumulated_bytes + real_size >= 512*1024*1024 { + chunks.push(current_chunk); + current_chunk = Vec::new(); + accumulated_bytes = 0; + } + current_chunk.push(file); + accumulated_bytes += real_size; + }; + if !current_chunk.is_empty() { + chunks.push(current_chunk); + } + + let mut last_key_end: Key = start_key; + let mut first = true; + for chunk in chunks { + let key_start = last_key_end; + let key_end = rel_key_range(chunk.last().unwrap().rel_tag).end; + let mut layer_writer = ImageLayerWriter::new( + &self.conf, + self.tli, + self.tsi, + &(key_start..key_end), + self.pgdata_lsn, + &self.ctx, + ).await?; + + if first { + if let Some(dbdir_buf) = dbdir_buf.take() { + layer_writer.put_image(DBDIR_KEY, dbdir_buf, &self.ctx).await?; + } + + // Import relmap (00:spcnode:dbnode:00:*:00) + let relmap_key = relmap_file_key(db.spcnode, db.dboid); + debug!("Constructing relmap entry, key {relmap_key}"); + let mut relmap_file = tokio::fs::File::open(&db.path.join("pg_filenode.map")).await?; + let relmap_buf = read_all_bytes(&mut relmap_file).await?; + layer_writer.put_image(relmap_key, relmap_buf, &self.ctx).await?; + + // Import reldir (00:spcnode:dbnode:00:*:01) + let reldir_key = rel_dir_to_key(db.spcnode, db.dboid); + debug!("Constructing reldirs entry, key {reldir_key}"); + let reldir_buf = RelDirectory::ser(&RelDirectory { + rels: db.files.iter().map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)).collect(), + })?; + layer_writer.put_image(reldir_key, reldir_buf.into(), &self.ctx).await?; + + first = false; + } + + for file in chunk { + self.import_rel_file(&mut layer_writer, &file).await?; + } + last_key_end = key_end; + + let layerdesc = layer_writer.finish_raw(&self.ctx).await?; + self.layers.push(layerdesc); + } + + Ok(last_key_end) } async fn import_rel_file( @@ -298,7 +343,7 @@ impl PgImportEnv { Ok(()) } - async fn create_index_part(&mut self, layers: &[PersistentLayerDesc], control_file: &ControlFileData) -> anyhow::Result<()> { + async fn create_index_part(&mut self, control_file: &ControlFileData) -> anyhow::Result<()> { let dstdir = &self.conf.workdir; let pg_version = match control_file.catalog_version_no { @@ -325,7 +370,7 @@ impl PgImportEnv { let generation = Generation::none(); let mut index_part = IndexPart::empty(metadata); - for l in layers { + for l in self.layers.iter() { let name = l.layer_name(); let metadata = LayerFileMetadata::new(l.file_size, generation, ShardIndex::unsharded()); if let Some(_) = index_part.layer_metadata.insert(name.clone(), metadata) { diff --git a/test_runner/regress/test_pg_import.py b/test_runner/regress/test_pg_import.py index 10248f77e4..0ae6e3f4d0 100644 --- a/test_runner/regress/test_pg_import.py +++ b/test_runner/regress/test_pg_import.py @@ -71,3 +71,11 @@ def test_pg_import(test_output_dir, pg_bin, vanilla_pg, neon_env_builder): cur = conn.cursor() assert endpoint.safe_psql("select count(*) from t") == [(300000,)] + + # test writing after the import + endpoint.safe_psql("insert into t select g from generate_series(1, 1000) g") + assert endpoint.safe_psql("select count(*) from t") == [(301000,)] + + endpoint.stop() + endpoint.start() + assert endpoint.safe_psql("select count(*) from t") == [(301000,)]