diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 401887d362..c55b9e9484 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -48,7 +48,7 @@ pub struct ShardedRange<'a> { // Calculate the size of a range within the blocks of the same relation, or spanning only the // top page in the previous relation's space. -fn contiguous_range_len(range: &Range) -> u32 { +pub fn contiguous_range_len(range: &Range) -> u32 { debug_assert!(is_contiguous_range(range)); if range.start.field6 == 0xffffffff { range.end.field6 + 1 @@ -67,7 +67,7 @@ fn contiguous_range_len(range: &Range) -> u32 { /// This matters, because: /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse. /// - Within such ranges, we may calculate distances using simple subtraction of field6. -fn is_contiguous_range(range: &Range) -> bool { +pub fn is_contiguous_range(range: &Range) -> bool { range.start.field1 == range.end.field1 && range.start.field2 == range.end.field2 && range.start.field3 == range.end.field3 diff --git a/pageserver/src/pg_import.rs b/pageserver/src/pg_import.rs index afb918c15a..ea5a4cde30 100644 --- a/pageserver/src/pg_import.rs +++ b/pageserver/src/pg_import.rs @@ -1,4 +1,4 @@ -use std::{fs::metadata, time::Instant}; +use std::fs::metadata; use anyhow::{bail, ensure, Context}; use bytes::Bytes; @@ -26,12 +26,14 @@ use crate::tenant::remote_timeline_client; use crate::tenant::remote_timeline_client::LayerFileMetadata; use pageserver_api::shard::ShardIndex; use pageserver_api::key::Key; +use pageserver_api::keyspace::{is_contiguous_range, contiguous_range_len}; +use pageserver_api::keyspace::singleton_range; use pageserver_api::reltag::SlruKind; use pageserver_api::key::{slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, TWOPHASEDIR_KEY, CONTROLFILE_KEY, CHECKPOINT_KEY}; -use pageserver_api::key::rel_key_range; use utils::bin_ser::BeSer; use std::collections::HashSet; +use std::ops::Range; pub struct PgImportEnv { ctx: RequestContext, @@ -41,6 +43,8 @@ pub struct PgImportEnv { pgdata_lsn: Lsn, + tasks: Vec, + layers: Vec, } @@ -69,6 +73,7 @@ impl PgImportEnv { tsi, pgdata_lsn: Lsn(0), // Will be filled in later, when the control file is imported + tasks: Vec::new(), layers: Vec::new(), }) } @@ -90,48 +95,54 @@ impl PgImportEnv { // Import dbdir (00:00:00 keyspace) // This is just constructed here, but will be written to the image layer in the first call to import_db() - let mut dbdir_buf = Some(Bytes::from(DbDirectory::ser(&DbDirectory { + let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory { dbdirs: datadir.dbs.iter().map(|db| ((db.spcnode, db.dboid), true)).collect(), - })?)); - // Import databases (00:spcnode:dbnode keyspace for each db) - let mut start_key = Key::MIN; - for db in datadir.dbs { - start_key = self.import_db(start_key, &db, dbdir_buf.take()).await?; - } + })?); + self.tasks.push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into()); - let mut tail_layer = ImageLayerWriter::new( - &self.conf, - self.tli, - self.tsi, - &(start_key..Key::NON_L0_MAX), - pgdata_lsn, - &self.ctx, - ).await?; + // Import databases (00:spcnode:dbnode keyspace for each db) + for db in datadir.dbs { + self.import_db(&db).await?; + } // Import SLRUs // pg_xact (01:00 keyspace) - self.import_slru(&mut tail_layer, SlruKind::Clog, &pgdata_path.join("pg_xact")).await?; + self.import_slru(SlruKind::Clog, &pgdata_path.join("pg_xact")).await?; // pg_multixact/members (01:01 keyspace) - self.import_slru(&mut tail_layer, SlruKind::MultiXactMembers, &pgdata_path.join("pg_multixact/members")).await?; + self.import_slru(SlruKind::MultiXactMembers, &pgdata_path.join("pg_multixact/members")).await?; // pg_multixact/offsets (01:02 keyspace) - self.import_slru(&mut tail_layer, SlruKind::MultiXactOffsets, &pgdata_path.join("pg_multixact/offsets")).await?; + self.import_slru(SlruKind::MultiXactOffsets, &pgdata_path.join("pg_multixact/offsets")).await?; // Import pg_twophase. // TODO: as empty let twophasedir_buf = TwoPhaseDirectory::ser( &TwoPhaseDirectory { xids: HashSet::new() } )?; - tail_layer.put_image(TWOPHASEDIR_KEY, Bytes::from(twophasedir_buf), &self.ctx).await?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(TWOPHASEDIR_KEY, Bytes::from(twophasedir_buf)))); // Controlfile, checkpoint - tail_layer.put_image(CONTROLFILE_KEY, Bytes::from(controlfile_buf), &self.ctx).await?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(CONTROLFILE_KEY, Bytes::from(controlfile_buf)))); let checkpoint_buf = control_file.checkPointCopy.encode()?; - tail_layer.put_image(CHECKPOINT_KEY, checkpoint_buf, &self.ctx).await?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(CHECKPOINT_KEY, checkpoint_buf))); - let layerdesc = tail_layer.finish_raw(&self.ctx).await?; - self.layers.push(layerdesc); + + // Execution time + let mut last_end_key = Key::MIN; + let mut current_chunk = Vec::new(); + let mut current_chunk_size: usize = 0; + for task in std::mem::take(&mut self.tasks).into_iter() { + if current_chunk_size + task.total_size() > 1024*1024*1024 { + let key_range = last_end_key..task.key_range().start; + self.flush_chunk(key_range.clone(), std::mem::take(&mut current_chunk)).await?; + last_end_key = key_range.end; + current_chunk_size = 0; + } + current_chunk_size += task.total_size(); + current_chunk.push(task); + } + self.flush_chunk(last_end_key..Key::NON_L0_MAX, current_chunk).await?; // should we anything about the wal? @@ -143,146 +154,45 @@ impl PgImportEnv { async fn import_db( &mut self, - start_key: Key, db: &PgDataDirDb, - mut dbdir_buf: Option, - ) -> anyhow::Result { + ) -> anyhow::Result<()> { debug!( "Importing database (path={}, tablespace={}, dboid={})", db.path, db.spcnode, db.dboid ); + // Import relmap (00:spcnode:dbnode:00:*:00) + let relmap_key = relmap_file_key(db.spcnode, db.dboid); + debug!("Constructing relmap entry, key {relmap_key}"); + let mut relmap_file = tokio::fs::File::open(&db.path.join("pg_filenode.map")).await?; + let relmap_buf = read_all_bytes(&mut relmap_file).await?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(relmap_key, relmap_buf))); + + // Import reldir (00:spcnode:dbnode:00:*:01) + let reldir_key = rel_dir_to_key(db.spcnode, db.dboid); + debug!("Constructing reldirs entry, key {reldir_key}"); + let reldir_buf = RelDirectory::ser(&RelDirectory { + rels: db.files.iter().map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)).collect(), + })?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(reldir_key, Bytes::from(reldir_buf)))); + // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff) - - let mut chunks: Vec> = Vec::new(); - let mut accumulated_bytes: usize = 0; - let mut current_chunk: Vec<&PgDataDirDbFile> = Vec::new(); - for file in &db.files { - let real_size = if let Some(nblocks) = file.nblocks { - nblocks * 8192 - } else { - 1024*1024*1024 - }; - if accumulated_bytes + real_size >= 512*1024*1024 { - chunks.push(current_chunk); - current_chunk = Vec::new(); - accumulated_bytes = 0; + let len = metadata(&file.path)?.len() as usize; + ensure!(len % 8192 == 0); + let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192); + let start_key = rel_block_to_key(file.rel_tag, start_blk); + let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32); + self.tasks.push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(start_key..end_key, &file.path))); + + // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff) + if let Some(nblocks) = file.nblocks { + let size_key = rel_size_to_key(file.rel_tag); + //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}"); + let buf = nblocks.to_le_bytes(); + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(size_key, Bytes::from(buf.to_vec())))); } - current_chunk.push(file); - accumulated_bytes += real_size; - }; - if !current_chunk.is_empty() { - chunks.push(current_chunk); - } - - let mut last_key_end: Key = start_key; - let mut first = true; - for chunk in chunks { - let key_start = last_key_end; - let last_file = chunk.last().unwrap(); - let key_end = if last_file.nblocks.is_some() { - rel_key_range(last_file.rel_tag).end - } else { - let end_blknum = (last_file.segno + 1) * (1024 * 1024 * 1024 / 8192); - rel_block_to_key(last_file.rel_tag, end_blknum) - }; - let mut layer_writer = ImageLayerWriter::new( - &self.conf, - self.tli, - self.tsi, - &(key_start..key_end), - self.pgdata_lsn, - &self.ctx, - ).await?; - - if first { - if let Some(dbdir_buf) = dbdir_buf.take() { - layer_writer.put_image(DBDIR_KEY, dbdir_buf, &self.ctx).await?; - } - - // Import relmap (00:spcnode:dbnode:00:*:00) - let relmap_key = relmap_file_key(db.spcnode, db.dboid); - debug!("Constructing relmap entry, key {relmap_key}"); - let mut relmap_file = tokio::fs::File::open(&db.path.join("pg_filenode.map")).await?; - let relmap_buf = read_all_bytes(&mut relmap_file).await?; - layer_writer.put_image(relmap_key, relmap_buf, &self.ctx).await?; - - // Import reldir (00:spcnode:dbnode:00:*:01) - let reldir_key = rel_dir_to_key(db.spcnode, db.dboid); - debug!("Constructing reldirs entry, key {reldir_key}"); - let reldir_buf = RelDirectory::ser(&RelDirectory { - rels: db.files.iter().map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)).collect(), - })?; - layer_writer.put_image(reldir_key, reldir_buf.into(), &self.ctx).await?; - - first = false; - } - - for file in chunk { - self.import_rel_file(&mut layer_writer, &file).await?; - } - last_key_end = key_end; - - let layerdesc = layer_writer.finish_raw(&self.ctx).await?; - self.layers.push(layerdesc); - } - - Ok(last_key_end) - } - - async fn import_rel_file( - &mut self, - layer_writer: &mut ImageLayerWriter, - segment: &PgDataDirDbFile, - ) -> anyhow::Result<()> { - let (path, rel_tag, segno) = (&segment.path, segment.rel_tag, segment.segno); - - debug!("Importing relation file (path={path}, rel_tag={rel_tag}, segno={segno})"); - let start = Instant::now(); - - let mut reader = tokio::fs::File::open(&path).await?; - let len = metadata(&path)?.len() as usize; - - let mut buf: [u8; 8192] = [0u8; 8192]; - - ensure!(len % BLCKSZ as usize == 0); - let nblocks = len / BLCKSZ as usize; - - let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32); - - loop { - let r = reader.read_exact(&mut buf).await; - match r { - Ok(_) => { - let key = rel_block_to_key(rel_tag.clone(), blknum); - layer_writer.put_image(key, Bytes::copy_from_slice(&buf), &self.ctx).await?; - } - - Err(err) => match err.kind() { - std::io::ErrorKind::UnexpectedEof => { - // reached EOF. That's expected. - let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32); - ensure!(relative_blknum == nblocks as u32, "unexpected EOF"); - break; - } - _ => { - bail!("error reading file {}: {:#}", path, err); - } - }, - }; - blknum += 1; - } - - debug!("Importing relation file (path={path}, rel_tag={rel_tag}, segno={segno}): done in {:.6} s", start.elapsed().as_secs_f64()); - - // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff) - if let Some(nblocks) = segment.nblocks { - let size_key = rel_size_to_key(rel_tag); - debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}"); - let buf = nblocks.to_le_bytes(); - layer_writer.put_image(size_key, Bytes::from(buf.to_vec()), &self.ctx).await?; } Ok(()) @@ -290,7 +200,6 @@ impl PgImportEnv { async fn import_slru( &mut self, - layer_writer: &mut ImageLayerWriter, kind: SlruKind, path: &Utf8PathBuf, ) -> anyhow::Result<()> { @@ -312,39 +221,22 @@ impl PgImportEnv { segments: segnos, }; let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?; - layer_writer.put_image(slrudir_key, slrudir_buf.into(), &self.ctx).await?; + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(slrudir_key, Bytes::from(slrudir_buf)))); for (segpath, segno) in segments { // SlruSegBlocks for each segment let p = path.join(Utf8PathBuf::from(segpath)); - let mut reader = tokio::fs::File::open(&p).await - .context(format!("opening {}", &p))?; - - let mut rpageno = 0; - loop { - let mut buf: Vec = Vec::new(); - buf.resize(8192, 0); - let r = reader.read_exact(&mut buf).await; - match r { - Ok(_) => {}, - Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => { - // reached EOF. That's expected - break; - } - Err(err) => { - bail!("error reading file {}: {:#}", &p, err); - } - }; - let slruseg_key = slru_block_to_key(kind, segno, rpageno); - layer_writer.put_image(slruseg_key, Bytes::from(buf), &self.ctx).await?; - rpageno += 1; - } - let npages: u32 = rpageno; + let file_size = std::fs::metadata(&p)?.len(); + ensure!(file_size % 8192 == 0); + let nblocks = u32::try_from(file_size / 8192)?; + let start_key = slru_block_to_key(kind, segno, 0); + let end_key = slru_block_to_key(kind, segno, nblocks); + self.tasks.push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(start_key..end_key, &p))); // Followed by SlruSegSize let segsize_key = slru_segment_size_to_key(kind, segno); - let segsize_buf = npages.to_le_bytes(); - layer_writer.put_image(segsize_key, Bytes::copy_from_slice(&segsize_buf), &self.ctx).await?; + let segsize_buf = nblocks.to_le_bytes(); + self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(segsize_key, Bytes::copy_from_slice(&segsize_buf)))); } Ok(()) } @@ -392,6 +284,26 @@ impl PgImportEnv { Ok(()) } + + + async fn flush_chunk(&mut self, key_range: Range, chunk: Vec) -> anyhow::Result<()> { + let mut layer = ImageLayerWriter::new( + &self.conf, + self.tli, + self.tsi, + &key_range, + self.pgdata_lsn, + &self.ctx, + ).await?; + + for task in chunk { + task.doit(&mut layer, &self.ctx).await?; + } + + let layerdesc = layer.finish_raw(&self.ctx).await?; + self.layers.push(layerdesc); + Ok(()) + } } // @@ -529,3 +441,155 @@ async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> anyhow::Result reader.read_to_end(&mut buf).await?; Ok(Bytes::from(buf)) } + +trait ImportTask { + fn key_range(&self) -> Range; + + fn total_size(&self) -> usize { + if is_contiguous_range(&self.key_range()) { + contiguous_range_len(&self.key_range()) as usize * 8192 + } else { + u32::MAX as usize + } + } + + async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()>; +} + +struct ImportSingleKeyTask { + key: Key, + buf: Bytes, +} + +impl ImportSingleKeyTask { + fn new(key: Key, buf: Bytes) -> Self { + ImportSingleKeyTask { key, buf } + } +} + +impl ImportTask for ImportSingleKeyTask { + fn key_range(&self) -> Range { + singleton_range(self.key) + } + + async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> { + layer_writer.put_image(self.key, self.buf, ctx).await?; + Ok(()) + } +} + +struct ImportRelBlocksTask { + key_range: Range, + path: Utf8PathBuf, +} + +impl ImportRelBlocksTask { + fn new(key_range: Range, path: &Utf8Path) -> Self { + ImportRelBlocksTask { + key_range, + path: path.into() + } + } +} + +impl ImportTask for ImportRelBlocksTask { + fn key_range(&self) -> Range { + self.key_range.clone() + } + + async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> { + debug!("Importing relation file {}", self.path); + let mut reader = tokio::fs::File::open(&self.path).await?; + let mut buf: [u8; 8192] = [0u8; 8192]; + + let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?; + let (_rel_tag, end_blk) = self.key_range.end.to_rel_block()?; + let mut blknum = start_blk; + while blknum < end_blk { + reader.read_exact(&mut buf).await?; + let key = rel_block_to_key(rel_tag.clone(), blknum); + layer_writer.put_image(key, Bytes::copy_from_slice(&buf), ctx).await?; + blknum += 1; + } + Ok(()) + } +} + +struct ImportSlruBlocksTask { + key_range: Range, + path: Utf8PathBuf, +} + +impl ImportSlruBlocksTask { + fn new(key_range: Range, path: &Utf8Path) -> Self { + ImportSlruBlocksTask { + key_range, + path: path.into() + } + } +} + +impl ImportTask for ImportSlruBlocksTask { + fn key_range(&self) -> Range { + self.key_range.clone() + } + + async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> { + debug!("Importing SLRU segment file {}", self.path); + let mut reader = tokio::fs::File::open(&self.path).await + .context(format!("opening {}", &self.path))?; + let mut buf: [u8; 8192] = [0u8; 8192]; + + let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?; + let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?; + let mut blknum = start_blk; + while blknum < end_blk { + reader.read_exact(&mut buf).await?; + let key = slru_block_to_key(kind, segno, blknum); + layer_writer.put_image(key, Bytes::copy_from_slice(&buf), ctx).await?; + blknum += 1; + } + Ok(()) + } +} + +enum AnyImportTask { + SingleKey(ImportSingleKeyTask), + RelBlocks(ImportRelBlocksTask), + SlruBlocks(ImportSlruBlocksTask), +} + +impl ImportTask for AnyImportTask { + fn key_range(&self) -> Range { + match self { + Self::SingleKey(t) => t.key_range(), + Self::RelBlocks(t) => t.key_range(), + Self::SlruBlocks(t) => t.key_range() + } + } + async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> { + match self { + Self::SingleKey(t) => t.doit(layer_writer, ctx).await, + Self::RelBlocks(t) => t.doit(layer_writer, ctx).await, + Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await, + } + } +} + +impl From for AnyImportTask { + fn from(t: ImportSingleKeyTask) -> Self { + Self::SingleKey(t) + } +} + +impl From for AnyImportTask { + fn from(t: ImportRelBlocksTask) -> Self { + Self::RelBlocks(t) + } +} + +impl From for AnyImportTask { + fn from(t: ImportSlruBlocksTask) -> Self { + Self::SlruBlocks(t) + } +}