From 938100058dc25deb0ac680878b70654a25d2a0c8 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Sat, 11 Jun 2022 15:55:52 -0400 Subject: [PATCH] Don't read all into memory --- pageserver/src/import_datadir.rs | 55 ++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index a6b856c7aa..15b10e3e0b 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -432,7 +432,7 @@ fn import_wal( pub fn import_basebackup_from_tar( tline: &mut DatadirTimeline, - mut reader: Reader, + reader: Reader, base_lsn: Lsn, ) -> Result<()> { info!("importing base at {}", base_lsn); @@ -443,15 +443,15 @@ pub fn import_basebackup_from_tar( for base_tar_entry in tar::Archive::new(reader).entries()? { let mut entry = base_tar_entry.unwrap(); let header = entry.header(); + let len = header.entry_size()? as usize; let file_path = header.path().unwrap().into_owned(); match header.entry_type() { tar::EntryType::Regular => { - let mut buffer = Vec::new(); - // TODO stream it instead - entry.read_to_end(&mut buffer).unwrap(); + // let mut buffer = Vec::new(); + // entry.read_to_end(&mut buffer).unwrap(); - import_file(&mut modification, &file_path.as_ref(), &buffer)?; + import_file(&mut modification, &file_path.as_ref(), entry, len)?; }, tar::EntryType::Directory => { info!("directory {:?}", file_path); @@ -471,7 +471,7 @@ pub fn import_basebackup_from_tar( pub fn import_wal_from_tar( tline: &mut DatadirTimeline, - mut reader: Reader, + reader: Reader, start_lsn: Lsn, end_lsn: Lsn, ) -> Result<()> { @@ -489,19 +489,16 @@ pub fn import_wal_from_tar( let mut pg_wal_entries_iter = pg_wal_tar.entries()?; while last_lsn <= end_lsn { let bytes = { - let mut entry = pg_wal_entries_iter.next().expect("expected more wal")?; + let entry = pg_wal_entries_iter.next().expect("expected more wal")?; let header = entry.header(); let file_path = header.path().unwrap().into_owned(); match header.entry_type() { tar::EntryType::Regular => { - let mut buffer = Vec::new(); - entry.read_to_end(&mut buffer).unwrap(); - // TODO assert filename matches segno info!("processing wal file {:?}", file_path); - buffer + read_all_bytes(entry)? } tar::EntryType::Directory => { info!("directory {:?}", file_path); @@ -546,14 +543,13 @@ pub fn import_wal_from_tar( Ok(()) } -pub fn import_file( +pub fn import_file( modification: &mut DatadirModification, file_path: &Path, - buffer: &[u8], + reader: Reader, + len: usize, ) -> Result> { info!("looking at {:?}", file_path); - let bytes = Bytes::copy_from_slice(&buffer[..]); - let bytes_len = bytes.len(); if file_path.starts_with("global") { let spcnode = pg_constants::GLOBALTABLESPACE_OID; @@ -561,17 +557,20 @@ pub fn import_file( match file_path.file_name().unwrap().to_string_lossy().as_ref() { "pg_control" => { - // Import it as ControlFile - modification.put_control_file(bytes)?; + let bytes = read_all_bytes(reader)?; // Extract the checkpoint record and import it separately. - let pg_control = ControlFileData::decode(&buffer)?; + let pg_control = ControlFileData::decode(&bytes[..])?; let checkpoint_bytes = pg_control.checkPointCopy.encode()?; modification.put_checkpoint(checkpoint_bytes)?; info!("imported control file"); + + // Import it as ControlFile + modification.put_control_file(bytes)?; return Ok(Some(pg_control)); } "pg_filenode.map" => { + let bytes = read_all_bytes(reader)?; modification.put_relmap_file(spcnode, dbnode, bytes)?; info!("imported relmap file") } @@ -579,7 +578,7 @@ pub fn import_file( info!("ignored"); }, _ => { - import_rel(modification, file_path, spcnode, dbnode, bytes.reader(), bytes_len)?; + import_rel(modification, file_path, spcnode, dbnode, reader, len)?; info!("imported rel creation"); } } @@ -596,6 +595,7 @@ pub fn import_file( match file_path.file_name().unwrap().to_string_lossy().as_ref() { "pg_filenode.map" => { + let bytes = read_all_bytes(reader)?; modification.put_relmap_file(spcnode, dbnode, bytes)?; info!("imported relmap file") } @@ -603,29 +603,30 @@ pub fn import_file( info!("ignored"); }, _ => { - import_rel(modification, file_path, spcnode, dbnode, bytes.reader(), bytes_len)?; + import_rel(modification, file_path, spcnode, dbnode, reader, len)?; info!("imported rel creation"); } } } else if file_path.starts_with("pg_xact") { let slru = SlruKind::Clog; - import_slru(modification, slru, file_path, bytes.reader(), bytes_len)?; + import_slru(modification, slru, file_path, reader, len)?; info!("imported clog slru"); } else if file_path.starts_with("pg_multixact/offsets") { let slru = SlruKind::MultiXactOffsets; - import_slru(modification, slru, file_path, bytes.reader(), bytes_len)?; + import_slru(modification, slru, file_path, reader, len)?; info!("imported multixact offsets slru"); } else if file_path.starts_with("pg_multixact/members") { let slru = SlruKind::MultiXactMembers; - import_slru(modification, slru, file_path, bytes.reader(), bytes_len)?; + import_slru(modification, slru, file_path, reader, len)?; info!("imported multixact members slru"); } else if file_path.starts_with("pg_twophase") { let xid = u32::from_str_radix(&file_path.file_name().unwrap().to_string_lossy(), 16)?; - modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?; + let bytes = read_all_bytes(reader)?; + modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?; info!("imported twophase file"); } else if file_path.starts_with("pg_wal") { panic!("found wal file in base section"); @@ -637,3 +638,9 @@ pub fn import_file( Ok(None) } + +fn read_all_bytes(mut reader: Reader) -> Result { + let mut buf: Vec = vec![]; + reader.read_to_end(&mut buf)?; + Ok(Bytes::copy_from_slice(&buf[..])) +}