From 1e6267a35f9246ec497490834a5af1a61648117d Mon Sep 17 00:00:00 2001 From: anastasia Date: Mon, 19 Jul 2021 13:19:59 +0300 Subject: [PATCH] Get rid of snapshot directory + related code cleanup and refactoring. - Add new subdir postgres_ffi/samples/ for config file samples. - Don't copy wal to the new branch on zenith init or zenith branch. - Import_timeline_wal on zenith init. --- control_plane/src/compute.rs | 20 ++- pageserver/src/basebackup.rs | 185 ++++++++------------ pageserver/src/branches.rs | 242 ++++++++++----------------- pageserver/src/lib.rs | 4 - pageserver/src/object_repository.rs | 3 +- pageserver/src/page_service.rs | 9 - pageserver/src/restore_local_repo.rs | 61 ++----- postgres_ffi/samples/pg_hba.conf | 98 +++++++++++ postgres_ffi/src/pg_constants.rs | 38 +++++ 9 files changed, 324 insertions(+), 336 deletions(-) create mode 100644 postgres_ffi/samples/pg_hba.conf diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 5122f205f2..fee3e4b2e3 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -20,6 +20,7 @@ use crate::local_env::LocalEnv; use pageserver::{ZTenantId, ZTimelineId}; use crate::storage::PageServerNode; +use postgres_ffi::pg_constants; // // ComputeControlPlane @@ -278,10 +279,19 @@ impl PostgresNode { }, )?; - // FIXME: The compute node should be able to stream the WAL it needs from the WAL safekeepers or archive. - // But that's not implemented yet. For now, 'pg_wal' is included in the base backup tarball that - // we receive from the Page Server, so we don't need to create the empty 'pg_wal' directory here. - //fs::create_dir_all(pgdata.join("pg_wal"))?; + // Create pgdata subdirs structure + for dir in pg_constants::PGDATA_SUBDIRS.iter() { + let path = pgdata.as_path().join(*dir); + + fs::create_dir_all(path.clone())?; + + fs::set_permissions(path, fs::Permissions::from_mode(0o700)).with_context(|| { + format!( + "could not set permissions in data directory {}", + pgdata.display() + ) + })?; + } let mut copyreader = client .copy_out(sql.as_str()) @@ -322,7 +332,7 @@ impl PostgresNode { // Never clean up old WAL. TODO: We should use a replication // slot or something proper, to prevent the compute node // from removing WAL that hasn't been streamed to the safekeepr or - // page server yet. But this will do for now. + // page server yet. (gh issue #349) self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?; // Connect it to the page server. diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 1b6b939db4..d3fdf4ecf3 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -4,23 +4,22 @@ //! TODO: this module has nothing to do with PostgreSQL pg_basebackup. //! It could use a better name. //! -//! Stateless Postgres compute node is launched by sending tarball which contains non-relational data (multixacts, clog, filenodemaps, twophase files) -//! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directory and -//! data stored in object storage. +//! Stateless Postgres compute node is launched by sending a tarball +//! which contains non-relational data (multixacts, clog, filenodemaps, twophase files), +//! generated pg_control and dummy segment of WAL. +//! This module is responsible for creation of such tarball +//! from data stored in object storage. //! -use crate::{PageServerConf, ZTenantId, ZTimelineId}; use bytes::{BufMut, BytesMut}; use log::*; +use std::io; use std::io::Write; -use std::path::PathBuf; use std::sync::Arc; use std::time::SystemTime; -use tar::{Builder, Header}; -use walkdir::WalkDir; +use tar::{Builder, EntryType, Header}; -use crate::object_key::*; +use crate::object_key::{DatabaseTag, ObjectTag}; use crate::repository::Timeline; -use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; use postgres_ffi::*; use zenith_utils::lsn::Lsn; @@ -33,7 +32,6 @@ pub struct Basebackup<'a> { timeline: &'a Arc, lsn: Lsn, prev_record_lsn: Lsn, - snappath: PathBuf, slru_buf: [u8; pg_constants::SLRU_SEG_SIZE], slru_segno: u32, slru_path: &'static str, @@ -41,23 +39,16 @@ pub struct Basebackup<'a> { impl<'a> Basebackup<'a> { pub fn new( - conf: &PageServerConf, write: &'a mut dyn Write, - tenantid: ZTenantId, - timelineid: ZTimelineId, timeline: &'a Arc, lsn: Lsn, prev_record_lsn: Lsn, - snapshot_lsn: Lsn, ) -> Basebackup<'a> { Basebackup { ar: Builder::new(write), timeline, lsn, prev_record_lsn, - snappath: conf - .snapshots_path(&timelineid, &tenantid) - .join(format!("{:016X}", snapshot_lsn.0)), slru_path: "", slru_segno: u32::MAX, slru_buf: [0u8; pg_constants::SLRU_SEG_SIZE], @@ -65,47 +56,22 @@ impl<'a> Basebackup<'a> { } pub fn send_tarball(&mut self) -> anyhow::Result<()> { - debug!("sending tarball of snapshot in {}", self.snappath.display()); - for entry in WalkDir::new(&self.snappath) { - let entry = entry?; - let fullpath = entry.path(); - let relpath = entry.path().strip_prefix(&self.snappath).unwrap(); - - if relpath.to_str().unwrap() == "" { - continue; - } - - if entry.file_type().is_dir() { - trace!( - "sending dir {} as {}", - fullpath.display(), - relpath.display() - ); - self.ar.append_dir(relpath, fullpath)?; - } else if entry.file_type().is_symlink() { - error!("ignoring symlink in snapshot dir"); - } else if entry.file_type().is_file() { - if !is_rel_file_path(relpath.to_str().unwrap()) { - if entry.file_name() != "pg_filenode.map" // this files will be generated from object storage - && !relpath.starts_with("pg_xact/") - && !relpath.starts_with("pg_multixact/") - { - trace!("sending {}", relpath.display()); - self.ar.append_path_with_name(fullpath, relpath)?; - } - } else { - // relation pages are loaded on demand and should not be included in tarball - trace!("not sending {}", relpath.display()); - } + // Send empty config files. + for filepath in pg_constants::PGDATA_SPECIAL_FILES.iter() { + if *filepath == "pg_hba.conf" { + let data = pg_constants::PG_HBA.as_bytes(); + let header = new_tar_header(&filepath, data.len() as u64)?; + self.ar.append(&header, &data[..])?; } else { - error!("unknown file type: {}", fullpath.display()); + let header = new_tar_header(&filepath, 0)?; + self.ar.append(&header, &mut io::empty())?; } } - // Generate non-relational files. - // Iteration is sorted order: all objects of the same time are grouped and traversed + // Gather non-relational files from object storage pages. + // Iteration is sorted order: all objects of the same type are grouped and traversed // in key ascending order. For example all pg_xact records precede pg_multixact records and are sorted by block number. - // It allows to easily construct SLRU segments (32 blocks). + // It allows to easily construct SLRU segments. for obj in self.timeline.list_nonrels(self.lsn)? { match obj { ObjectTag::Clog(slru) => self.add_slru_segment("pg_xact", &obj, slru.blknum)?, @@ -120,7 +86,10 @@ impl<'a> Basebackup<'a> { _ => {} } } - self.finish_slru_segment()?; // write last non-completed SLRU segment (if any) + + // write last non-completed SLRU segment (if any) + self.finish_slru_segment()?; + // Generate pg_control and bootstrap WAL segment. self.add_pgcontrol_file()?; self.ar.finish()?; debug!("all tarred up!"); @@ -129,19 +98,18 @@ impl<'a> Basebackup<'a> { // // Generate SLRU segment files from repository. Path identifies SLRU kind (pg_xact, pg_multixact/members, ...). - // Intially pass an empty string. // fn add_slru_segment( &mut self, path: &'static str, tag: &ObjectTag, - page: u32, + blknum: u32, ) -> anyhow::Result<()> { let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; // Zero length image indicates truncated segment: just skip it if !img.is_empty() { assert!(img.len() == pg_constants::BLCKSZ as usize); - let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT; + let segno = blknum / pg_constants::SLRU_PAGES_PER_SEGMENT; if self.slru_path != "" && (self.slru_segno != segno || self.slru_path != path) { // Switch to new segment: save old one let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno); @@ -151,7 +119,7 @@ impl<'a> Basebackup<'a> { } self.slru_segno = segno; self.slru_path = path; - let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize + let offs_start = (blknum % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize * pg_constants::BLCKSZ as usize; let offs_end = offs_start + pg_constants::BLCKSZ as usize; self.slru_buf[offs_start..offs_end].copy_from_slice(&img); @@ -175,18 +143,36 @@ impl<'a> Basebackup<'a> { // // Extract pg_filenode.map files from repository + // Along with them also send PG_VERSION for each database. // fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> { + trace!("add_relmap_file {:?}", db); let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; - info!("add_relmap_file {:?}", db); let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID { + let dst_path = "PG_VERSION"; + let version_bytes = pg_constants::PG_MAJORVERSION.as_bytes(); + let header = new_tar_header(&dst_path, version_bytes.len() as u64)?; + self.ar.append(&header, &version_bytes[..])?; + + let dst_path = format!("global/PG_VERSION"); + let header = new_tar_header(&dst_path, version_bytes.len() as u64)?; + self.ar.append(&header, &version_bytes[..])?; + String::from("global/pg_filenode.map") // filenode map for global tablespace } else { // User defined tablespaces are not supported assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID); - let src_path = self.snappath.join("base/1/PG_VERSION"); + + // Append dir path for each database + let path = format!("base/{}", db.dbnode); + let header = new_tar_header_dir(&path)?; + self.ar.append(&header, &mut io::empty())?; + let dst_path = format!("base/{}/PG_VERSION", db.dbnode); - self.ar.append_path_with_name(&src_path, &dst_path)?; + let version_bytes = pg_constants::PG_MAJORVERSION.as_bytes(); + let header = new_tar_header(&dst_path, version_bytes.len() as u64)?; + self.ar.append(&header, &version_bytes[..])?; + format!("base/{}/pg_filenode.map", db.dbnode) }; assert!(img.len() == 512); @@ -216,7 +202,8 @@ impl<'a> Basebackup<'a> { } // - // Add generated pg_control file + // Add generated pg_control file and bootstrap WAL segment. + // Also send zenith.signal file with extra bootstrap data. // fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> { let checkpoint_bytes = self @@ -238,12 +225,13 @@ impl<'a> Basebackup<'a> { checkpoint.redo = self.lsn.0 + self.lsn.calc_padding(8u32); //reset some fields we don't want to preserve + //TODO Check this. + //We may need to determine the value from twophase data. checkpoint.oldestActiveXid = 0; //save new values in pg_control pg_control.checkPoint = checkpoint_lsn; pg_control.checkPointCopy = checkpoint; - info!("pg_control.state = {}", pg_control.state); pg_control.state = pg_constants::DB_SHUTDOWNED; // add zenith.signal file @@ -271,60 +259,6 @@ impl<'a> Basebackup<'a> { } } -/// -/// Parse a path, relative to the root of PostgreSQL data directory, as -/// a PostgreSQL relation data file. -/// -fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> { - /* - * Relation data files can be in one of the following directories: - * - * global/ - * shared relations - * - * base// - * regular relations, default tablespace - * - * pg_tblspc/// - * within a non-default tablespace (the name of the directory - * depends on version) - * - * And the relation data files themselves have a filename like: - * - * . - */ - if let Some(fname) = path.strip_prefix("global/") { - let (_relnode, _forknum, _segno) = parse_relfilename(fname)?; - - Ok(()) - } else if let Some(dbpath) = path.strip_prefix("base/") { - let mut s = dbpath.split('/'); - let dbnode_str = s.next().ok_or(FilePathError::InvalidFileName)?; - let _dbnode = dbnode_str.parse::()?; - let fname = s.next().ok_or(FilePathError::InvalidFileName)?; - if s.next().is_some() { - return Err(FilePathError::InvalidFileName); - }; - - let (_relnode, _forknum, _segno) = parse_relfilename(fname)?; - - Ok(()) - } else if path.strip_prefix("pg_tblspc/").is_some() { - // TODO - error!("tablespaces not implemented yet"); - Err(FilePathError::InvalidFileName) - } else { - Err(FilePathError::InvalidFileName) - } -} - -// -// Check if it is relational file -// -fn is_rel_file_path(path: &str) -> bool { - parse_rel_file_path(path).is_ok() -} - // // Create new tarball entry header // @@ -343,3 +277,20 @@ fn new_tar_header(path: &str, size: u64) -> anyhow::Result
{ header.set_cksum(); Ok(header) } + +fn new_tar_header_dir(path: &str) -> anyhow::Result
{ + let mut header = Header::new_gnu(); + header.set_size(0); + header.set_path(path)?; + header.set_mode(0o755); // -rw------- + header.set_entry_type(EntryType::dir()); + header.set_mtime( + // use currenttime as last modified time + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + ); + header.set_cksum(); + Ok(header) +} diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 0a2218ae35..802f942636 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -5,21 +5,19 @@ // use anyhow::{bail, ensure, Context, Result}; -use fs::File; -use postgres_ffi::{pg_constants, xlog_utils, ControlFileData}; +use postgres_ffi::ControlFileData; use serde::{Deserialize, Serialize}; use std::env; -use std::io::Read; -use std::sync::Arc; use std::{ fs, - path::{Path, PathBuf}, + path::Path, process::{Command, Stdio}, str::FromStr, + sync::Arc, }; -use zenith_utils::lsn::Lsn; use log::*; +use zenith_utils::lsn::Lsn; use crate::logger; use crate::object_repository::ObjectRepository; @@ -65,6 +63,7 @@ pub fn init_pageserver( .with_context(|| "failed to create repo")?; } fs::create_dir_all(conf.tenants_path())?; + println!("pageserver init succeeded"); Ok(()) } @@ -92,43 +91,7 @@ pub fn create_repo( info!("created directory structure in {}", repo_dir.display()); - // Run initdb - // - // We create the cluster temporarily in a "tmp" directory inside the repository, - // and move it to the right location from there. - let tmppath = conf.tenant_path(&tenantid).join("tmp"); - - info!("running initdb... "); - - let initdb_path = conf.pg_bin_dir().join("initdb"); - let initdb_otput = Command::new(initdb_path) - .args(&["-D", tmppath.to_str().unwrap()]) - .args(&["-U", &conf.superuser]) - .arg("--no-instructions") - .env_clear() - .env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) - .stdout(Stdio::null()) - .output() - .with_context(|| "failed to execute initdb")?; - if !initdb_otput.status.success() { - anyhow::bail!( - "initdb failed: '{}'", - String::from_utf8_lossy(&initdb_otput.stderr) - ); - } - info!("initdb succeeded"); - - // Read control file to extract the LSN and system id - let controlfile_path = tmppath.join("global").join("pg_control"); - let controlfile = ControlFileData::decode(&fs::read(controlfile_path)?)?; - // let systemid = controlfile.system_identifier; - let lsn = controlfile.checkPoint; - let lsnstr = format!("{:016X}", lsn); - - // Bootstrap the repository by loading the newly-initdb'd cluster into 'main' branch. let tli = create_timeline(conf, None, &tenantid)?; - let timelinedir = conf.timeline_path(&tli, &tenantid); // We don't use page_cache here, because we don't want to spawn the WAL redo thread during // repository initialization. @@ -147,38 +110,93 @@ pub fn create_repo( wal_redo_manager, tenantid, ); - let timeline = repo.create_empty_timeline(tli, Lsn(lsn))?; - restore_local_repo::import_timeline_from_postgres_datadir(&tmppath, &*timeline, Lsn(lsn))?; + // Load data into pageserver + // TODO To implement zenith import we need to + // move data loading out of create_repo() + bootstrap_timeline(conf, tenantid, tli, &repo)?; - // Move the initial WAL file - fs::rename( - tmppath.join("pg_wal").join("000000010000000000000001"), - timelinedir - .join("wal") - .join("000000010000000000000001.partial"), - )?; - info!("created initial timeline {}", tli); + Ok(repo) +} + +// Returns checkpoint LSN from controlfile +fn get_lsn_from_controlfile(path: &Path) -> Result { + // Read control file to extract the LSN + let controlfile_path = path.join("global").join("pg_control"); + let controlfile = ControlFileData::decode(&fs::read(controlfile_path)?)?; + let lsn = controlfile.checkPoint; + + Ok(Lsn(lsn)) +} + +// Create the cluster temporarily in a initdbpath directory inside the repository +// to get bootstrap data for timeline initialization. +// +fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> { + info!("running initdb... "); + + let initdb_path = conf.pg_bin_dir().join("initdb"); + let initdb_otput = Command::new(initdb_path) + .args(&["-D", initdbpath.to_str().unwrap()]) + .args(&["-U", &conf.superuser]) + .arg("--no-instructions") + .env_clear() + .env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) + .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) + .stdout(Stdio::null()) + .output() + .with_context(|| "failed to execute initdb")?; + if !initdb_otput.status.success() { + anyhow::bail!( + "initdb failed: '{}'", + String::from_utf8_lossy(&initdb_otput.stderr) + ); + } + info!("initdb succeeded"); + + Ok(()) +} + +// +// - run initdb to init temporary instance and get bootstrap data +// - after initialization complete, remove the temp dir. +// +fn bootstrap_timeline( + conf: &'static PageServerConf, + tenantid: ZTenantId, + tli: ZTimelineId, + repo: &dyn Repository, +) -> Result<()> { + let initdb_path = conf.tenant_path(&tenantid).join("tmp"); + + // Init temporarily repo to get bootstrap data + run_initdb(conf, &initdb_path)?; + let pgdata_path = initdb_path; + + let lsn = get_lsn_from_controlfile(&pgdata_path)?; + + info!("bootstrap_timeline {:?} at lsn {}", pgdata_path, lsn); + + let timeline = repo.create_empty_timeline(tli, lsn)?; + restore_local_repo::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?; + + let wal_dir = pgdata_path.join("pg_wal"); + restore_local_repo::import_timeline_wal(&wal_dir, &*timeline, timeline.get_last_record_lsn())?; + + println!( + "created initial timeline {} timeline.lsn {}", + tli, + timeline.get_last_record_lsn() + ); let data = tli.to_string(); fs::write(conf.branch_path("main", &tenantid), data)?; - info!("created main branch"); + println!("created main branch"); - // Remove pg_wal - fs::remove_dir_all(tmppath.join("pg_wal"))?; + // Remove temp dir. We don't need it anymore + fs::remove_dir_all(pgdata_path)?; - // Move the data directory as an initial base backup. - // FIXME: It would be enough to only copy the non-relation files here, the relation - // data was already loaded into the repository. - let target = timelinedir.join("snapshots").join(&lsnstr); - fs::rename(tmppath, &target)?; - - info!( - "new zenith repository was created in {}", - repo_dir.display() - ); - - Ok(repo) + Ok(()) } pub(crate) fn get_tenants(conf: &PageServerConf) -> Result> { @@ -269,26 +287,10 @@ pub(crate) fn create_branch( // create a new timeline directory for it let newtli = create_timeline(conf, Some(startpoint), tenantid)?; - let newtimelinedir = conf.timeline_path(&newtli, tenantid); // Let the Repository backend do its initialization repo.branch_timeline(startpoint.timelineid, newtli, startpoint.lsn)?; - // Copy the latest snapshot (TODO: before the startpoint) and all WAL - // TODO: be smarter and avoid the copying... - let (_maxsnapshot, oldsnapshotdir) = - find_latest_snapshot(conf, &startpoint.timelineid, tenantid)?; - let copy_opts = fs_extra::dir::CopyOptions::new(); - fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), ©_opts)?; - - let oldtimelinedir = conf.timeline_path(&startpoint.timelineid, tenantid); - copy_wal( - &oldtimelinedir.join("wal"), - &newtimelinedir.join("wal"), - startpoint.lsn, - pg_constants::WAL_SEGMENT_SIZE, - )?; - // Remember the human-readable branch name for the new timeline. // FIXME: there's a race condition, if you create a branch with the same // name concurrently. @@ -389,7 +391,6 @@ fn create_timeline( let timelinedir = conf.timeline_path(&timelineid, tenantid); fs::create_dir(&timelinedir)?; - fs::create_dir(&timelinedir.join("snapshots"))?; fs::create_dir(&timelinedir.join("wal"))?; if let Some(ancestor) = ancestor { @@ -399,76 +400,3 @@ fn create_timeline( Ok(timelineid) } - -/// -/// Copy all WAL segments from one directory to another, up to given LSN. -/// -/// If the given LSN is in the middle of a segment, the last segment containing it -/// is written out as .partial, and padded with zeros. -/// -fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: usize) -> Result<()> { - let last_segno = upto.segment_number(wal_seg_size); - let last_segoff = upto.segment_offset(wal_seg_size); - - for entry in fs::read_dir(src_dir).unwrap().flatten() { - let entry_name = entry.file_name(); - let fname = entry_name.to_str().unwrap(); - - // Check if the filename looks like an xlog file, or a .partial file. - if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) { - continue; - } - let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize); - - let copylen; - let mut dst_fname = PathBuf::from(fname); - if segno > last_segno { - // future segment, skip - continue; - } else if segno < last_segno { - copylen = wal_seg_size; - dst_fname.set_extension(""); - } else { - copylen = last_segoff; - dst_fname.set_extension("partial"); - } - - let src_file = File::open(entry.path())?; - let mut dst_file = File::create(dst_dir.join(&dst_fname))?; - std::io::copy(&mut src_file.take(copylen as u64), &mut dst_file)?; - - if copylen < wal_seg_size { - std::io::copy( - &mut std::io::repeat(0).take((wal_seg_size - copylen) as u64), - &mut dst_file, - )?; - } - } - Ok(()) -} - -// Find the latest snapshot for a timeline -fn find_latest_snapshot( - conf: &PageServerConf, - timelineid: &ZTimelineId, - tenantid: &ZTenantId, -) -> Result<(Lsn, PathBuf)> { - let snapshotsdir = conf.snapshots_path(timelineid, tenantid); - let paths = fs::read_dir(&snapshotsdir)?; - let mut maxsnapshot = Lsn(0); - let mut snapshotdir: Option = None; - for path in paths { - let path = path?; - let filename = path.file_name().to_str().unwrap().to_owned(); - if let Ok(lsn) = Lsn::from_hex(&filename) { - maxsnapshot = std::cmp::max(lsn, maxsnapshot); - snapshotdir = Some(path.path()); - } - } - if maxsnapshot == Lsn(0) { - // TODO: check ancestor timeline - anyhow::bail!("no snapshot found in {}", snapshotsdir.display()); - } - - Ok((maxsnapshot, snapshotdir.unwrap())) -} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 4db9e74a53..4bdc1b4e3d 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -78,10 +78,6 @@ impl PageServerConf { self.timelines_path(tenantid).join(timelineid.to_string()) } - fn snapshots_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf { - self.timeline_path(timelineid, tenantid).join("snapshots") - } - fn ancestor_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf { self.timeline_path(timelineid, tenantid).join("ancestor") } diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index fbc607f24b..d043157007 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -145,7 +145,8 @@ impl Repository for ObjectRepository { fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> { let src_timeline = self.get_timeline(src)?; - // Write a metadata key, noting the ancestor of th new timeline. There is initially + trace!("branch_timeline at lsn {}", at_lsn); + // Write a metadata key, noting the ancestor of the new timeline. There is initially // no data in it, but all the read-calls know to look into the ancestor. let metadata = MetadataEntry { last_valid_lsn: at_lsn, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 01c9c8e4b1..434cfffbcd 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -32,7 +32,6 @@ use crate::branches; use crate::object_key::ObjectTag; use crate::page_cache; use crate::repository::{BufferTag, Modification, RelTag}; -use crate::restore_local_repo; use crate::walreceiver; use crate::walredo::PostgresRedoManager; use crate::PageServerConf; @@ -295,23 +294,15 @@ impl PageServerHandler { /* Send a tarball of the latest snapshot on the timeline */ - // find latest snapshot - let snapshot_lsn = - restore_local_repo::find_latest_snapshot(&self.conf, &timelineid, &tenantid).unwrap(); - let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn()); { let mut writer = CopyDataSink { pgb }; let mut basebackup = basebackup::Basebackup::new( - self.conf, &mut writer, - tenantid, - timelineid, &timeline, req_lsn, timeline.get_prev_record_lsn(), - snapshot_lsn, ); basebackup.send_tarball()?; } diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 61e839d5db..cf50af23d4 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -3,7 +3,7 @@ //! zenith Timeline. //! use log::*; -use std::cmp::{max, min}; +use std::cmp::min; use std::fs; use std::fs::File; use std::io::Read; @@ -17,9 +17,6 @@ use bytes::{Buf, Bytes}; use crate::object_key::*; use crate::repository::*; use crate::waldecoder::*; -use crate::PageServerConf; -use crate::ZTenantId; -use crate::ZTimelineId; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; use postgres_ffi::{pg_constants, CheckPoint, ControlFileData}; @@ -28,36 +25,6 @@ use zenith_utils::lsn::Lsn; const MAX_MBR_BLKNO: u32 = pg_constants::MAX_MULTIXACT_ID / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; -/// -/// Find latest snapshot in a timeline's 'snapshots' directory -/// -pub fn find_latest_snapshot( - conf: &PageServerConf, - timelineid: &ZTimelineId, - tenantid: &ZTenantId, -) -> Result { - let snapshotspath = conf.snapshots_path(timelineid, tenantid); - let mut last_snapshot_lsn = Lsn(0); - for direntry in fs::read_dir(&snapshotspath).unwrap() { - let filename = direntry.unwrap().file_name(); - - if let Ok(lsn) = Lsn::from_filename(&filename) { - last_snapshot_lsn = max(lsn, last_snapshot_lsn); - } else { - error!("unrecognized file in snapshots directory: {:?}", filename); - } - } - - if last_snapshot_lsn == Lsn(0) { - error!( - "could not find valid snapshot in {}", - snapshotspath.display() - ); - // TODO return error? - } - Ok(last_snapshot_lsn) -} - /// /// Import all relation data pages from local disk into the repository. /// @@ -108,6 +75,11 @@ pub fn import_timeline_from_postgres_datadir( for direntry in fs::read_dir(path.join("base"))? { let direntry = direntry?; + //skip all temporary files + if direntry.file_name().to_str().unwrap() == "pgsql_tmp" { + continue; + } + let dboid = direntry.file_name().to_str().unwrap().parse::()?; for direntry in fs::read_dir(direntry.path())? { @@ -296,8 +268,8 @@ fn import_slru_file( Ok(()) } -/// Scan PostgreSQL WAL files in given directory, and load all records >= 'startpoint' into -/// the repository. +/// Scan PostgreSQL WAL files in given directory +/// and load all records >= 'startpoint' into the repository. pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> { let mut waldecoder = WalStreamDecoder::new(startpoint); @@ -311,6 +283,9 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: loop { // FIXME: assume postgresql tli 1 for now let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); + let mut buf = Vec::new(); + + //Read local file let mut path = walpath.join(&filename); // It could be as .partial @@ -331,12 +306,12 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: file.seek(SeekFrom::Start(offset as u64))?; } - let mut buf = Vec::new(); let nread = file.read_to_end(&mut buf)?; if nread != pg_constants::WAL_SEGMENT_SIZE - offset as usize { // Maybe allow this for .partial files? error!("read only {} bytes from WAL file", nread); } + waldecoder.feed_bytes(&buf); let mut nrecords = 0; @@ -358,19 +333,19 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: nrecords += 1; } - info!( - "imported {} records from WAL file {} up to {}", - nrecords, - path.display(), - last_lsn - ); + info!("imported {} records up to {}", nrecords, last_lsn); segno += 1; offset = 0; } + info!("reached end of WAL at {}", last_lsn); let checkpoint_bytes = checkpoint.encode(); timeline.put_page_image(ObjectTag::Checkpoint, last_lsn, checkpoint_bytes, false)?; + + timeline.advance_last_valid_lsn(last_lsn); + timeline.checkpoint()?; + Ok(()) } diff --git a/postgres_ffi/samples/pg_hba.conf b/postgres_ffi/samples/pg_hba.conf new file mode 100644 index 0000000000..0079ef3214 --- /dev/null +++ b/postgres_ffi/samples/pg_hba.conf @@ -0,0 +1,98 @@ +# PostgreSQL Client Authentication Configuration File +# =================================================== +# +# Refer to the "Client Authentication" section in the PostgreSQL +# documentation for a complete description of this file. A short +# synopsis follows. +# +# This file controls: which hosts are allowed to connect, how clients +# are authenticated, which PostgreSQL user names they can use, which +# databases they can access. Records take one of these forms: +# +# local DATABASE USER METHOD [OPTIONS] +# host DATABASE USER ADDRESS METHOD [OPTIONS] +# hostssl DATABASE USER ADDRESS METHOD [OPTIONS] +# hostnossl DATABASE USER ADDRESS METHOD [OPTIONS] +# hostgssenc DATABASE USER ADDRESS METHOD [OPTIONS] +# hostnogssenc DATABASE USER ADDRESS METHOD [OPTIONS] +# +# (The uppercase items must be replaced by actual values.) +# +# The first field is the connection type: +# - "local" is a Unix-domain socket +# - "host" is a TCP/IP socket (encrypted or not) +# - "hostssl" is a TCP/IP socket that is SSL-encrypted +# - "hostnossl" is a TCP/IP socket that is not SSL-encrypted +# - "hostgssenc" is a TCP/IP socket that is GSSAPI-encrypted +# - "hostnogssenc" is a TCP/IP socket that is not GSSAPI-encrypted +# +# DATABASE can be "all", "sameuser", "samerole", "replication", a +# database name, or a comma-separated list thereof. The "all" +# keyword does not match "replication". Access to replication +# must be enabled in a separate record (see example below). +# +# USER can be "all", a user name, a group name prefixed with "+", or a +# comma-separated list thereof. In both the DATABASE and USER fields +# you can also write a file name prefixed with "@" to include names +# from a separate file. +# +# ADDRESS specifies the set of hosts the record matches. It can be a +# host name, or it is made up of an IP address and a CIDR mask that is +# an integer (between 0 and 32 (IPv4) or 128 (IPv6) inclusive) that +# specifies the number of significant bits in the mask. A host name +# that starts with a dot (.) matches a suffix of the actual host name. +# Alternatively, you can write an IP address and netmask in separate +# columns to specify the set of hosts. Instead of a CIDR-address, you +# can write "samehost" to match any of the server's own IP addresses, +# or "samenet" to match any address in any subnet that the server is +# directly connected to. +# +# METHOD can be "trust", "reject", "md5", "password", "scram-sha-256", +# "gss", "sspi", "ident", "peer", "pam", "ldap", "radius" or "cert". +# Note that "password" sends passwords in clear text; "md5" or +# "scram-sha-256" are preferred since they send encrypted passwords. +# +# OPTIONS are a set of options for the authentication in the format +# NAME=VALUE. The available options depend on the different +# authentication methods -- refer to the "Client Authentication" +# section in the documentation for a list of which options are +# available for which authentication methods. +# +# Database and user names containing spaces, commas, quotes and other +# special characters must be quoted. Quoting one of the keywords +# "all", "sameuser", "samerole" or "replication" makes the name lose +# its special character, and just match a database or username with +# that name. +# +# This file is read on server startup and when the server receives a +# SIGHUP signal. If you edit the file on a running system, you have to +# SIGHUP the server for the changes to take effect, run "pg_ctl reload", +# or execute "SELECT pg_reload_conf()". +# +# Put your actual configuration here +# ---------------------------------- +# +# If you want to allow non-local connections, you need to add more +# "host" records. In that case you will also need to make PostgreSQL +# listen on a non-local interface via the listen_addresses +# configuration parameter, or via the -i or -h command line switches. + +# CAUTION: Configuring the system for local "trust" authentication +# allows any local user to connect as any PostgreSQL user, including +# the database superuser. If you do not trust all your local users, +# use another authentication method. + + +# TYPE DATABASE USER ADDRESS METHOD + +# "local" is for Unix domain socket connections only +local all all trust +# IPv4 local connections: +host all all 127.0.0.1/32 trust +# IPv6 local connections: +host all all ::1/128 trust +# Allow replication connections from localhost, by a user with the +# replication privilege. +local replication all trust +host replication all 127.0.0.1/32 trust +host replication all ::1/128 trust diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 337d94a2a4..8069095855 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -185,3 +185,41 @@ pub const XLOG_BLCKSZ: usize = 8192; pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00; pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10; pub const XLP_LONG_HEADER: u16 = 0x0002; + +pub const PG_MAJORVERSION: &'static str = "14"; + +// List of subdirectories inside pgdata. +// Copied from src/bin/initdb/initdb.c +pub const PGDATA_SUBDIRS: [&'static str; 22] = [ + "global", + "pg_wal/archive_status", + "pg_commit_ts", + "pg_dynshmem", + "pg_notify", + "pg_serial", + "pg_snapshots", + "pg_subtrans", + "pg_twophase", + "pg_multixact", + "pg_multixact/members", + "pg_multixact/offsets", + "base", + "base/1", + "pg_replslot", + "pg_tblspc", + "pg_stat", + "pg_stat_tmp", + "pg_xact", + "pg_logical", + "pg_logical/snapshots", + "pg_logical/mappings", +]; + +pub const PGDATA_SPECIAL_FILES: [&'static str; 4] = [ + "pg_hba.conf", + "pg_ident.conf", + "postgresql.conf", + "postgresql.auto.conf", +]; + +pub static PG_HBA: &'static str = include_str!("../samples/pg_hba.conf");