Don't read all into memory

This commit is contained in:
Bojan Serafimov
2022-06-11 15:55:52 -04:00
parent 978ef167e0
commit 938100058d

View File

@@ -432,7 +432,7 @@ fn import_wal<R: Repository>(
pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
mut reader: Reader,
reader: Reader,
base_lsn: Lsn,
) -> Result<()> {
info!("importing base at {}", base_lsn);
@@ -443,15 +443,15 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
for base_tar_entry in tar::Archive::new(reader).entries()? {
let mut entry = base_tar_entry.unwrap();
let header = entry.header();
let len = header.entry_size()? as usize;
let file_path = header.path().unwrap().into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
let mut buffer = Vec::new();
// TODO stream it instead
entry.read_to_end(&mut buffer).unwrap();
// let mut buffer = Vec::new();
// entry.read_to_end(&mut buffer).unwrap();
import_file(&mut modification, &file_path.as_ref(), &buffer)?;
import_file(&mut modification, &file_path.as_ref(), entry, len)?;
},
tar::EntryType::Directory => {
info!("directory {:?}", file_path);
@@ -471,7 +471,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
pub fn import_wal_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
mut reader: Reader,
reader: Reader,
start_lsn: Lsn,
end_lsn: Lsn,
) -> Result<()> {
@@ -489,19 +489,16 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
let mut pg_wal_entries_iter = pg_wal_tar.entries()?;
while last_lsn <= end_lsn {
let bytes = {
let mut entry = pg_wal_entries_iter.next().expect("expected more wal")?;
let entry = pg_wal_entries_iter.next().expect("expected more wal")?;
let header = entry.header();
let file_path = header.path().unwrap().into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
let mut buffer = Vec::new();
entry.read_to_end(&mut buffer).unwrap();
// TODO assert filename matches segno
info!("processing wal file {:?}", file_path);
buffer
read_all_bytes(entry)?
}
tar::EntryType::Directory => {
info!("directory {:?}", file_path);
@@ -546,14 +543,13 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
Ok(())
}
pub fn import_file<R: Repository>(
pub fn import_file<R: Repository, Reader: Read>(
modification: &mut DatadirModification<R>,
file_path: &Path,
buffer: &[u8],
reader: Reader,
len: usize,
) -> Result<Option<ControlFileData>> {
info!("looking at {:?}", file_path);
let bytes = Bytes::copy_from_slice(&buffer[..]);
let bytes_len = bytes.len();
if file_path.starts_with("global") {
let spcnode = pg_constants::GLOBALTABLESPACE_OID;
@@ -561,17 +557,20 @@ pub fn import_file<R: Repository>(
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
"pg_control" => {
// Import it as ControlFile
modification.put_control_file(bytes)?;
let bytes = read_all_bytes(reader)?;
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&buffer)?;
let pg_control = ControlFileData::decode(&bytes[..])?;
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
info!("imported control file");
// Import it as ControlFile
modification.put_control_file(bytes)?;
return Ok(Some(pg_control));
}
"pg_filenode.map" => {
let bytes = read_all_bytes(reader)?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
info!("imported relmap file")
}
@@ -579,7 +578,7 @@ pub fn import_file<R: Repository>(
info!("ignored");
},
_ => {
import_rel(modification, file_path, spcnode, dbnode, bytes.reader(), bytes_len)?;
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
info!("imported rel creation");
}
}
@@ -596,6 +595,7 @@ pub fn import_file<R: Repository>(
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
"pg_filenode.map" => {
let bytes = read_all_bytes(reader)?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
info!("imported relmap file")
}
@@ -603,29 +603,30 @@ pub fn import_file<R: Repository>(
info!("ignored");
},
_ => {
import_rel(modification, file_path, spcnode, dbnode, bytes.reader(), bytes_len)?;
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
info!("imported rel creation");
}
}
} else if file_path.starts_with("pg_xact") {
let slru = SlruKind::Clog;
import_slru(modification, slru, file_path, bytes.reader(), bytes_len)?;
import_slru(modification, slru, file_path, reader, len)?;
info!("imported clog slru");
} else if file_path.starts_with("pg_multixact/offsets") {
let slru = SlruKind::MultiXactOffsets;
import_slru(modification, slru, file_path, bytes.reader(), bytes_len)?;
import_slru(modification, slru, file_path, reader, len)?;
info!("imported multixact offsets slru");
} else if file_path.starts_with("pg_multixact/members") {
let slru = SlruKind::MultiXactMembers;
import_slru(modification, slru, file_path, bytes.reader(), bytes_len)?;
import_slru(modification, slru, file_path, reader, len)?;
info!("imported multixact members slru");
} else if file_path.starts_with("pg_twophase") {
let xid = u32::from_str_radix(&file_path.file_name().unwrap().to_string_lossy(), 16)?;
modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?;
let bytes = read_all_bytes(reader)?;
modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?;
info!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
panic!("found wal file in base section");
@@ -637,3 +638,9 @@ pub fn import_file<R: Repository>(
Ok(None)
}
fn read_all_bytes<Reader: Read>(mut reader: Reader) -> Result<Bytes> {
let mut buf: Vec<u8> = vec![];
reader.read_to_end(&mut buf)?;
Ok(Bytes::copy_from_slice(&buf[..]))
}