Write out multiple image layers

This commit is contained in:
Heikki Linnakangas
2024-09-13 01:47:10 +03:00
parent 9351ba26ff
commit 67d1606f82
2 changed files with 109 additions and 56 deletions

View File

@@ -28,6 +28,7 @@ use pageserver_api::shard::ShardIndex;
use pageserver_api::key::Key;
use pageserver_api::reltag::SlruKind;
use pageserver_api::key::{slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, TWOPHASEDIR_KEY, CONTROLFILE_KEY, CHECKPOINT_KEY};
use pageserver_api::key::rel_key_range;
use utils::bin_ser::BeSer;
use std::collections::HashSet;
@@ -39,6 +40,8 @@ pub struct PgImportEnv {
tsi: TenantShardId,
pgdata_lsn: Lsn,
layers: Vec<PersistentLayerDesc>,
}
impl PgImportEnv {
@@ -65,6 +68,8 @@ impl PgImportEnv {
tli: timeline_id,
tsi,
pgdata_lsn: Lsn(0), // Will be filled in later, when the control file is imported
layers: Vec::new(),
})
}
@@ -83,102 +88,142 @@ impl PgImportEnv {
let datadir = PgDataDir::new(pgdata_path);
let range = Key::MIN..Key::NON_L0_MAX;
let mut one_big_layer = ImageLayerWriter::new(
// Import dbdir (00:00:00 keyspace)
// This is just constructed here, but will be written to the image layer in the first call to import_db()
let mut dbdir_buf = Some(Bytes::from(DbDirectory::ser(&DbDirectory {
dbdirs: datadir.dbs.iter().map(|db| ((db.spcnode, db.dboid), true)).collect(),
})?));
// Import databases (00:spcnode:dbnode keyspace for each db)
let mut start_key = Key::MIN;
for db in datadir.dbs {
start_key = self.import_db(start_key, &db, dbdir_buf.take()).await?;
}
let mut tail_layer = ImageLayerWriter::new(
&self.conf,
self.tli,
self.tsi,
&range,
&(start_key..Key::NON_L0_MAX),
pgdata_lsn,
&self.ctx,
).await?;
// Import dbdir (00:00:00 keyspace)
self.import_dbdir(&mut one_big_layer, &datadir).await?;
// Import databases (00:spcnode:dbnode keyspace for each db)
for db in datadir.dbs {
self.import_db(&mut one_big_layer, &db).await?;
}
// Import SLRUs
// pg_xact (01:00 keyspace)
self.import_slru(&mut one_big_layer, SlruKind::Clog, &pgdata_path.join("pg_xact")).await?;
self.import_slru(&mut tail_layer, SlruKind::Clog, &pgdata_path.join("pg_xact")).await?;
// pg_multixact/members (01:01 keyspace)
self.import_slru(&mut one_big_layer, SlruKind::MultiXactMembers, &pgdata_path.join("pg_multixact/members")).await?;
self.import_slru(&mut tail_layer, SlruKind::MultiXactMembers, &pgdata_path.join("pg_multixact/members")).await?;
// pg_multixact/offsets (01:02 keyspace)
self.import_slru(&mut one_big_layer, SlruKind::MultiXactOffsets, &pgdata_path.join("pg_multixact/offsets")).await?;
self.import_slru(&mut tail_layer, SlruKind::MultiXactOffsets, &pgdata_path.join("pg_multixact/offsets")).await?;
// Import pg_twophase.
// TODO: as empty
let twophasedir_buf = TwoPhaseDirectory::ser(
&TwoPhaseDirectory { xids: HashSet::new() }
)?;
one_big_layer.put_image(TWOPHASEDIR_KEY, Bytes::from(twophasedir_buf), &self.ctx).await?;
tail_layer.put_image(TWOPHASEDIR_KEY, Bytes::from(twophasedir_buf), &self.ctx).await?;
// Controlfile, checkpoint
one_big_layer.put_image(CONTROLFILE_KEY, Bytes::from(controlfile_buf), &self.ctx).await?;
tail_layer.put_image(CONTROLFILE_KEY, Bytes::from(controlfile_buf), &self.ctx).await?;
let checkpoint_buf = control_file.checkPointCopy.encode()?;
one_big_layer.put_image(CHECKPOINT_KEY, checkpoint_buf, &self.ctx).await?;
tail_layer.put_image(CHECKPOINT_KEY, checkpoint_buf, &self.ctx).await?;
let layerdesc = one_big_layer.finish_raw(&self.ctx).await?;
let layerdesc = tail_layer.finish_raw(&self.ctx).await?;
self.layers.push(layerdesc);
// should we anything about the wal?
// Create index_part.json file
self.create_index_part(&[layerdesc], &control_file).await?;
self.create_index_part(&control_file).await?;
Ok(())
}
// DbDir: (spcnode, dbnode) -> bool (do relmapper and PG_VERSION files exist)
async fn import_dbdir(
&mut self,
layer_writer: &mut ImageLayerWriter,
datadir: &PgDataDir,
) -> anyhow::Result<()> {
debug!("Constructing dbdir entry, key {DBDIR_KEY}");
let dbdir_buf = DbDirectory::ser(&DbDirectory {
dbdirs: datadir.dbs.iter().map(|db| ((db.spcnode, db.dboid), true)).collect(),
})?;
layer_writer.put_image(DBDIR_KEY, dbdir_buf.into(), &self.ctx).await?;
Ok(())
}
async fn import_db(
&mut self,
layer_writer: &mut ImageLayerWriter,
start_key: Key,
db: &PgDataDirDb,
) -> anyhow::Result<()> {
mut dbdir_buf: Option<Bytes>,
) -> anyhow::Result<Key> {
debug!(
"Importing database (path={}, tablespace={}, dboid={})",
db.path, db.spcnode, db.dboid
);
// Import relmap (00:spcnode:dbnode:00:*:00)
let relmap_key = relmap_file_key(db.spcnode, db.dboid);
debug!("Constructing relmap entry, key {relmap_key}");
let mut relmap_file = tokio::fs::File::open(&db.path.join("pg_filenode.map")).await?;
let relmap_buf = read_all_bytes(&mut relmap_file).await?;
layer_writer.put_image(relmap_key, relmap_buf, &self.ctx).await?;
// Import reldir (00:spcnode:dbnode:00:*:01)
let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
debug!("Constructing reldirs entry, key {reldir_key}");
let reldir_buf = RelDirectory::ser(&RelDirectory {
rels: db.files.iter().map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)).collect(),
})?;
layer_writer.put_image(reldir_key, reldir_buf.into(), &self.ctx).await?;
// Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
// segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
for file in &db.files {
self.import_rel_file(layer_writer, &file).await?;
};
Ok(())
let mut chunks: Vec<Vec<&PgDataDirDbFile>> = Vec::new();
let mut accumulated_bytes: usize = 0;
let mut current_chunk: Vec<&PgDataDirDbFile> = Vec::new();
for file in &db.files {
let real_size = if let Some(nblocks) = file.nblocks {
nblocks * 8192
} else {
1024*1024*1024
};
if accumulated_bytes + real_size >= 512*1024*1024 {
chunks.push(current_chunk);
current_chunk = Vec::new();
accumulated_bytes = 0;
}
current_chunk.push(file);
accumulated_bytes += real_size;
};
if !current_chunk.is_empty() {
chunks.push(current_chunk);
}
let mut last_key_end: Key = start_key;
let mut first = true;
for chunk in chunks {
let key_start = last_key_end;
let key_end = rel_key_range(chunk.last().unwrap().rel_tag).end;
let mut layer_writer = ImageLayerWriter::new(
&self.conf,
self.tli,
self.tsi,
&(key_start..key_end),
self.pgdata_lsn,
&self.ctx,
).await?;
if first {
if let Some(dbdir_buf) = dbdir_buf.take() {
layer_writer.put_image(DBDIR_KEY, dbdir_buf, &self.ctx).await?;
}
// Import relmap (00:spcnode:dbnode:00:*:00)
let relmap_key = relmap_file_key(db.spcnode, db.dboid);
debug!("Constructing relmap entry, key {relmap_key}");
let mut relmap_file = tokio::fs::File::open(&db.path.join("pg_filenode.map")).await?;
let relmap_buf = read_all_bytes(&mut relmap_file).await?;
layer_writer.put_image(relmap_key, relmap_buf, &self.ctx).await?;
// Import reldir (00:spcnode:dbnode:00:*:01)
let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
debug!("Constructing reldirs entry, key {reldir_key}");
let reldir_buf = RelDirectory::ser(&RelDirectory {
rels: db.files.iter().map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)).collect(),
})?;
layer_writer.put_image(reldir_key, reldir_buf.into(), &self.ctx).await?;
first = false;
}
for file in chunk {
self.import_rel_file(&mut layer_writer, &file).await?;
}
last_key_end = key_end;
let layerdesc = layer_writer.finish_raw(&self.ctx).await?;
self.layers.push(layerdesc);
}
Ok(last_key_end)
}
async fn import_rel_file(
@@ -298,7 +343,7 @@ impl PgImportEnv {
Ok(())
}
async fn create_index_part(&mut self, layers: &[PersistentLayerDesc], control_file: &ControlFileData) -> anyhow::Result<()> {
async fn create_index_part(&mut self, control_file: &ControlFileData) -> anyhow::Result<()> {
let dstdir = &self.conf.workdir;
let pg_version = match control_file.catalog_version_no {
@@ -325,7 +370,7 @@ impl PgImportEnv {
let generation = Generation::none();
let mut index_part = IndexPart::empty(metadata);
for l in layers {
for l in self.layers.iter() {
let name = l.layer_name();
let metadata = LayerFileMetadata::new(l.file_size, generation, ShardIndex::unsharded());
if let Some(_) = index_part.layer_metadata.insert(name.clone(), metadata) {

View File

@@ -71,3 +71,11 @@ def test_pg_import(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
cur = conn.cursor()
assert endpoint.safe_psql("select count(*) from t") == [(300000,)]
# test writing after the import
endpoint.safe_psql("insert into t select g from generate_series(1, 1000) g")
assert endpoint.safe_psql("select count(*) from t") == [(301000,)]
endpoint.stop()
endpoint.start()
assert endpoint.safe_psql("select count(*) from t") == [(301000,)]