From 28616b09072b0e2e7787c733677ffde5ee7fcdcb Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Thu, 12 Sep 2024 10:33:14 +0100 Subject: [PATCH] compiles --- pageserver/src/pg_import.rs | 260 +++--------------------------------- 1 file changed, 22 insertions(+), 238 deletions(-) diff --git a/pageserver/src/pg_import.rs b/pageserver/src/pg_import.rs index 140a7dc423..643bb9978e 100644 --- a/pageserver/src/pg_import.rs +++ b/pageserver/src/pg_import.rs @@ -1,4 +1,4 @@ -use std::{path::Path, str::FromStr}; +use std::{fs::metadata, path::Path, str::FromStr}; use anyhow::{bail, ensure, Context}; use bytes::Bytes; @@ -84,18 +84,17 @@ impl PgImportEnv { .into_iter() .filter_map(|entry| { entry.ok().and_then(|path| { - path.file_name().to_string_lossy().parse::().ok() + path.file_name().to_string_lossy().parse::().ok() }) }) .sorted() .for_each(|dboid| { let path = pgdata_path.join("base").join(dboid.to_string()); - self.import_db(&mut one_big_layer, &path, pg_constants::DEFAULTTABLESPACE_OID); + self.import_db(&mut one_big_layer, &path, dboid, pg_constants::DEFAULTTABLESPACE_OID); }); // global catalogs now - self.import_db(&mut one_big_layer, &pgdata_path.join("global"), postgres_ffi::pg_constants::GLOBALTABLESPACE_OID); - + self.import_db(&mut one_big_layer, &pgdata_path.join("global"), 0, postgres_ffi::pg_constants::GLOBALTABLESPACE_OID); one_big_layer.finish_layer(&self.ctx).await?; @@ -108,21 +107,31 @@ impl PgImportEnv { &mut self, layer_writer: &mut ImageLayerWriter, path: &Utf8PathBuf, - spcnode: u32 + dboid: u32, + spcnode: u32, ) -> anyhow::Result<()> { + // traverse database directory in the same order as our RelKey ordering WalkDir::new(path) .max_depth(1) .into_iter() .filter_map(|entry| { entry.ok().and_then(|path| { let relfile = path.file_name().to_string_lossy(); + // returns (relnode, forknum, segno) parse_relfilename(&relfile).ok() }) }) .sorted() - .for_each(|a|{ - self.import_rel_file(); + .for_each(|(relnode, forknum, segno)|{ + let rel_tag = RelTag { + spcnode, + dbnode: dboid, + relnode, + forknum, + }; + + self.import_rel_file(layer_writer, path, rel_tag, segno).await?; }); Ok(()) @@ -132,31 +141,25 @@ impl PgImportEnv { &mut self, layer_writer: &mut ImageLayerWriter, path: &Utf8PathBuf, - spcnode: u32 + rel_tag: RelTag, + segno: u32, ) -> anyhow::Result<()> { let mut reader = tokio::fs::File::open(path).await?; - let len = metadata(path)?.len(); + let len = metadata(path)?.len() as usize; let mut buf: [u8; 8192] = [0u8; 8192]; ensure!(len % BLCKSZ as usize == 0); let nblocks = len / BLCKSZ as usize; - let rel = RelTag { - spcnode: spcoid, - dbnode: dboid, - relnode, - forknum, - }; - let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32); loop { let r = reader.read_exact(&mut buf).await; match r { Ok(_) => { - let key = rel_block_to_key(rel, blknum); + let key = rel_block_to_key(rel_tag, blknum); layer_writer.put_image(key, Bytes::copy_from_slice(&buf), &self.ctx).await?; } @@ -168,7 +171,7 @@ impl PgImportEnv { break; } _ => { - bail!("error reading file {}: {:#}", path.display(), err); + bail!("error reading file {}: {:#}", path, err); } }, }; @@ -178,225 +181,6 @@ impl PgImportEnv { Ok(()) } - async fn import_file( - // modification: &mut DatadirModification<'_>, - &mut self, - layer_writer: &mut ImageLayerWriter, - file_path: &Path, - reader: &mut (impl AsyncRead + Send + Sync + Unpin), - len: usize, - ) -> anyhow::Result> { - let file_name = match file_path.file_name() { - Some(name) => name.to_string_lossy(), - None => return Ok(None), - }; - - if file_name.starts_with('.') { - // tar archives on macOs, created without COPYFILE_DISABLE=1 env var - // will contain "fork files", skip them. - return Ok(None); - } - - if file_path.starts_with("global") { - let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID; - let dbnode = 0; - - match file_name.as_ref() { - // "pg_control" => { - // let bytes = read_all_bytes(reader).await?; - - // // Extract the checkpoint record and import it separately. - // let pg_control = ControlFileData::decode(&bytes[..])?; - // let checkpoint_bytes = pg_control.checkPointCopy.encode()?; - // // modification.put_checkpoint(checkpoint_bytes)?; - // debug!("imported control file"); - - // // Import it as ControlFile - // // modification.put_control_file(bytes)?; - // return Ok(Some(pg_control)); - // } - // "pg_filenode.map" => { - // // let bytes = read_all_bytes(reader).await?; - // // modification - // // .put_relmap_file(spcnode, dbnode, bytes, ctx) - // // .await?; - // debug!("imported relmap file") - // } - "PG_VERSION" => { - debug!("ignored PG_VERSION file"); - } - _ => { - self.import_rel(layer_writer, file_path, spcnode, dbnode, reader, len).await?; - debug!("imported rel creation"); - } - } - } else if file_path.starts_with("base") { - let spcnode = pg_constants::DEFAULTTABLESPACE_OID; - let dbnode: u32 = file_path - .iter() - .nth(1) - .expect("invalid file path, expected dbnode") - .to_string_lossy() - .parse()?; - - match file_name.as_ref() { - // "pg_filenode.map" => { - // let bytes = read_all_bytes(reader).await?; - // modification - // .put_relmap_file(spcnode, dbnode, bytes, ctx) - // .await?; - // debug!("imported relmap file") - // } - "PG_VERSION" => { - debug!("ignored PG_VERSION file"); - } - _ => { - self.import_rel(layer_writer, file_path, spcnode, dbnode, reader, len).await?; - debug!("imported rel creation"); - } - } - // } else if file_path.starts_with("pg_xact") { - // let slru = SlruKind::Clog; - - // import_slru(modification, slru, file_path, reader, len, ctx).await?; - // debug!("imported clog slru"); - // } else if file_path.starts_with("pg_multixact/offsets") { - // let slru = SlruKind::MultiXactOffsets; - - // import_slru(modification, slru, file_path, reader, len, ctx).await?; - // debug!("imported multixact offsets slru"); - // } else if file_path.starts_with("pg_multixact/members") { - // let slru = SlruKind::MultiXactMembers; - - // import_slru(modification, slru, file_path, reader, len, ctx).await?; - // debug!("imported multixact members slru"); - // } else if file_path.starts_with("pg_twophase") { - // let xid = u32::from_str_radix(file_name.as_ref(), 16)?; - - // let bytes = read_all_bytes(reader).await?; - // modification - // .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx) - // .await?; - // debug!("imported twophase file"); - } else if file_path.starts_with("pg_wal") { - debug!("found wal file in base section. ignore it"); - // } else if file_path.starts_with("zenith.signal") { - // // Parse zenith signal file to set correct previous LSN - // let bytes = read_all_bytes(reader).await?; - // // zenith.signal format is "PREV LSN: prev_lsn" - // // TODO write serialization and deserialization in the same place. - // let zenith_signal = std::str::from_utf8(&bytes)?.trim(); - // let prev_lsn = match zenith_signal { - // "PREV LSN: none" => Lsn(0), - // "PREV LSN: invalid" => Lsn(0), - // other => { - // let split = other.split(':').collect::>(); - // split[1] - // .trim() - // .parse::() - // .context("can't parse zenith.signal")? - // } - // }; - - // // zenith.signal is not necessarily the last file, that we handle - // // but it is ok to call `finish_write()`, because final `modification.commit()` - // // will update lsn once more to the final one. - // let writer = modification.tline.writer().await; - // writer.finish_write(prev_lsn); - - // debug!("imported zenith signal {}", prev_lsn); - } else if file_path.starts_with("pg_tblspc") { - // TODO Backups exported from neon won't have pg_tblspc, but we will need - // this to import arbitrary postgres databases. - bail!("Importing pg_tblspc is not implemented"); - } else { - debug!( - "ignoring unrecognized file \"{}\" in tar archive", - file_path.display() - ); - } - - Ok(None) - } - - - // subroutine of import_timeline_from_postgres_datadir(), to load one relation file. - async fn import_rel( - // modification: &mut DatadirModification<'_>, - &self, - layer_writer: &mut ImageLayerWriter, - path: &Path, - spcoid: Oid, - dboid: Oid, - reader: &mut (impl AsyncRead + Unpin), - len: usize, - ) -> anyhow::Result<()> { - // Does it look like a relation file? - trace!("importing rel file {}", path.display()); - - let filename = &path - .file_name() - .expect("missing rel filename") - .to_string_lossy(); - let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| { - warn!("unrecognized file in postgres datadir: {:?} ({})", path, e); - e - })?; - - let mut buf: [u8; 8192] = [0u8; 8192]; - - ensure!(len % BLCKSZ as usize == 0); - let nblocks = len / BLCKSZ as usize; - - let rel = RelTag { - spcnode: spcoid, - dbnode: dboid, - relnode, - forknum, - }; - - let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32); - - - loop { - let r = reader.read_exact(&mut buf).await; - match r { - Ok(_) => { - let key = rel_block_to_key(rel, blknum); - layer_writer.put_image(key, Bytes::copy_from_slice(&buf), &self.ctx).await?; - // if modification.tline.get_shard_identity().is_key_local(&key) { - // modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?; - // } - } - - Err(err) => match err.kind() { - std::io::ErrorKind::UnexpectedEof => { - // reached EOF. That's expected. - let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32); - ensure!(relative_blknum == nblocks as u32, "unexpected EOF"); - break; - } - _ => { - bail!("error reading file {}: {:#}", path.display(), err); - } - }, - }; - blknum += 1; - } - - // img_layer_writer.finish_layer(ctx).await?; - - // // Update relation size - // // - // // If we process rel segments out of order, - // // put_rel_extend will skip the update. - // modification.put_rel_extend(rel, blknum, ctx).await?; - - Ok(()) - } - - - } async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> anyhow::Result {