This commit is contained in:
Stas Kelvich
2024-09-12 10:33:14 +01:00
parent 98d128d993
commit 28616b0907

View File

@@ -1,4 +1,4 @@
use std::{path::Path, str::FromStr};
use std::{fs::metadata, path::Path, str::FromStr};
use anyhow::{bail, ensure, Context};
use bytes::Bytes;
@@ -84,18 +84,17 @@ impl PgImportEnv {
.into_iter()
.filter_map(|entry| {
entry.ok().and_then(|path| {
path.file_name().to_string_lossy().parse::<i32>().ok()
path.file_name().to_string_lossy().parse::<u32>().ok()
})
})
.sorted()
.for_each(|dboid| {
let path = pgdata_path.join("base").join(dboid.to_string());
self.import_db(&mut one_big_layer, &path, pg_constants::DEFAULTTABLESPACE_OID);
self.import_db(&mut one_big_layer, &path, dboid, pg_constants::DEFAULTTABLESPACE_OID);
});
// global catalogs now
self.import_db(&mut one_big_layer, &pgdata_path.join("global"), postgres_ffi::pg_constants::GLOBALTABLESPACE_OID);
self.import_db(&mut one_big_layer, &pgdata_path.join("global"), 0, postgres_ffi::pg_constants::GLOBALTABLESPACE_OID);
one_big_layer.finish_layer(&self.ctx).await?;
@@ -108,21 +107,31 @@ impl PgImportEnv {
&mut self,
layer_writer: &mut ImageLayerWriter,
path: &Utf8PathBuf,
spcnode: u32
dboid: u32,
spcnode: u32,
) -> anyhow::Result<()> {
// traverse database directory in the same order as our RelKey ordering
WalkDir::new(path)
.max_depth(1)
.into_iter()
.filter_map(|entry| {
entry.ok().and_then(|path| {
let relfile = path.file_name().to_string_lossy();
// returns (relnode, forknum, segno)
parse_relfilename(&relfile).ok()
})
})
.sorted()
.for_each(|a|{
self.import_rel_file();
.for_each(|(relnode, forknum, segno)|{
let rel_tag = RelTag {
spcnode,
dbnode: dboid,
relnode,
forknum,
};
self.import_rel_file(layer_writer, path, rel_tag, segno).await?;
});
Ok(())
@@ -132,31 +141,25 @@ impl PgImportEnv {
&mut self,
layer_writer: &mut ImageLayerWriter,
path: &Utf8PathBuf,
spcnode: u32
rel_tag: RelTag,
segno: u32,
) -> anyhow::Result<()> {
let mut reader = tokio::fs::File::open(path).await?;
let len = metadata(path)?.len();
let len = metadata(path)?.len() as usize;
let mut buf: [u8; 8192] = [0u8; 8192];
ensure!(len % BLCKSZ as usize == 0);
let nblocks = len / BLCKSZ as usize;
let rel = RelTag {
spcnode: spcoid,
dbnode: dboid,
relnode,
forknum,
};
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
loop {
let r = reader.read_exact(&mut buf).await;
match r {
Ok(_) => {
let key = rel_block_to_key(rel, blknum);
let key = rel_block_to_key(rel_tag, blknum);
layer_writer.put_image(key, Bytes::copy_from_slice(&buf), &self.ctx).await?;
}
@@ -168,7 +171,7 @@ impl PgImportEnv {
break;
}
_ => {
bail!("error reading file {}: {:#}", path.display(), err);
bail!("error reading file {}: {:#}", path, err);
}
},
};
@@ -178,225 +181,6 @@ impl PgImportEnv {
Ok(())
}
async fn import_file(
// modification: &mut DatadirModification<'_>,
&mut self,
layer_writer: &mut ImageLayerWriter,
file_path: &Path,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
len: usize,
) -> anyhow::Result<Option<ControlFileData>> {
let file_name = match file_path.file_name() {
Some(name) => name.to_string_lossy(),
None => return Ok(None),
};
if file_name.starts_with('.') {
// tar archives on macOs, created without COPYFILE_DISABLE=1 env var
// will contain "fork files", skip them.
return Ok(None);
}
if file_path.starts_with("global") {
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
match file_name.as_ref() {
// "pg_control" => {
// let bytes = read_all_bytes(reader).await?;
// // Extract the checkpoint record and import it separately.
// let pg_control = ControlFileData::decode(&bytes[..])?;
// let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
// // modification.put_checkpoint(checkpoint_bytes)?;
// debug!("imported control file");
// // Import it as ControlFile
// // modification.put_control_file(bytes)?;
// return Ok(Some(pg_control));
// }
// "pg_filenode.map" => {
// // let bytes = read_all_bytes(reader).await?;
// // modification
// // .put_relmap_file(spcnode, dbnode, bytes, ctx)
// // .await?;
// debug!("imported relmap file")
// }
"PG_VERSION" => {
debug!("ignored PG_VERSION file");
}
_ => {
self.import_rel(layer_writer, file_path, spcnode, dbnode, reader, len).await?;
debug!("imported rel creation");
}
}
} else if file_path.starts_with("base") {
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
let dbnode: u32 = file_path
.iter()
.nth(1)
.expect("invalid file path, expected dbnode")
.to_string_lossy()
.parse()?;
match file_name.as_ref() {
// "pg_filenode.map" => {
// let bytes = read_all_bytes(reader).await?;
// modification
// .put_relmap_file(spcnode, dbnode, bytes, ctx)
// .await?;
// debug!("imported relmap file")
// }
"PG_VERSION" => {
debug!("ignored PG_VERSION file");
}
_ => {
self.import_rel(layer_writer, file_path, spcnode, dbnode, reader, len).await?;
debug!("imported rel creation");
}
}
// } else if file_path.starts_with("pg_xact") {
// let slru = SlruKind::Clog;
// import_slru(modification, slru, file_path, reader, len, ctx).await?;
// debug!("imported clog slru");
// } else if file_path.starts_with("pg_multixact/offsets") {
// let slru = SlruKind::MultiXactOffsets;
// import_slru(modification, slru, file_path, reader, len, ctx).await?;
// debug!("imported multixact offsets slru");
// } else if file_path.starts_with("pg_multixact/members") {
// let slru = SlruKind::MultiXactMembers;
// import_slru(modification, slru, file_path, reader, len, ctx).await?;
// debug!("imported multixact members slru");
// } else if file_path.starts_with("pg_twophase") {
// let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
// let bytes = read_all_bytes(reader).await?;
// modification
// .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
// .await?;
// debug!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
debug!("found wal file in base section. ignore it");
// } else if file_path.starts_with("zenith.signal") {
// // Parse zenith signal file to set correct previous LSN
// let bytes = read_all_bytes(reader).await?;
// // zenith.signal format is "PREV LSN: prev_lsn"
// // TODO write serialization and deserialization in the same place.
// let zenith_signal = std::str::from_utf8(&bytes)?.trim();
// let prev_lsn = match zenith_signal {
// "PREV LSN: none" => Lsn(0),
// "PREV LSN: invalid" => Lsn(0),
// other => {
// let split = other.split(':').collect::<Vec<_>>();
// split[1]
// .trim()
// .parse::<Lsn>()
// .context("can't parse zenith.signal")?
// }
// };
// // zenith.signal is not necessarily the last file, that we handle
// // but it is ok to call `finish_write()`, because final `modification.commit()`
// // will update lsn once more to the final one.
// let writer = modification.tline.writer().await;
// writer.finish_write(prev_lsn);
// debug!("imported zenith signal {}", prev_lsn);
} else if file_path.starts_with("pg_tblspc") {
// TODO Backups exported from neon won't have pg_tblspc, but we will need
// this to import arbitrary postgres databases.
bail!("Importing pg_tblspc is not implemented");
} else {
debug!(
"ignoring unrecognized file \"{}\" in tar archive",
file_path.display()
);
}
Ok(None)
}
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
async fn import_rel(
// modification: &mut DatadirModification<'_>,
&self,
layer_writer: &mut ImageLayerWriter,
path: &Path,
spcoid: Oid,
dboid: Oid,
reader: &mut (impl AsyncRead + Unpin),
len: usize,
) -> anyhow::Result<()> {
// Does it look like a relation file?
trace!("importing rel file {}", path.display());
let filename = &path
.file_name()
.expect("missing rel filename")
.to_string_lossy();
let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
e
})?;
let mut buf: [u8; 8192] = [0u8; 8192];
ensure!(len % BLCKSZ as usize == 0);
let nblocks = len / BLCKSZ as usize;
let rel = RelTag {
spcnode: spcoid,
dbnode: dboid,
relnode,
forknum,
};
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
loop {
let r = reader.read_exact(&mut buf).await;
match r {
Ok(_) => {
let key = rel_block_to_key(rel, blknum);
layer_writer.put_image(key, Bytes::copy_from_slice(&buf), &self.ctx).await?;
// if modification.tline.get_shard_identity().is_key_local(&key) {
// modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
// }
}
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof => {
// reached EOF. That's expected.
let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
break;
}
_ => {
bail!("error reading file {}: {:#}", path.display(), err);
}
},
};
blknum += 1;
}
// img_layer_writer.finish_layer(ctx).await?;
// // Update relation size
// //
// // If we process rel segments out of order,
// // put_rel_extend will skip the update.
// modification.put_rel_extend(rel, blknum, ctx).await?;
Ok(())
}
}
async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> anyhow::Result<Bytes> {