mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
Get rid of snapshot directory + related code cleanup and refactoring.
- Add new subdir postgres_ffi/samples/ for config file samples. - Don't copy wal to the new branch on zenith init or zenith branch. - Import_timeline_wal on zenith init.
This commit is contained in:
@@ -20,6 +20,7 @@ use crate::local_env::LocalEnv;
|
|||||||
use pageserver::{ZTenantId, ZTimelineId};
|
use pageserver::{ZTenantId, ZTimelineId};
|
||||||
|
|
||||||
use crate::storage::PageServerNode;
|
use crate::storage::PageServerNode;
|
||||||
|
use postgres_ffi::pg_constants;
|
||||||
|
|
||||||
//
|
//
|
||||||
// ComputeControlPlane
|
// ComputeControlPlane
|
||||||
@@ -278,10 +279,19 @@ impl PostgresNode {
|
|||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// FIXME: The compute node should be able to stream the WAL it needs from the WAL safekeepers or archive.
|
// Create pgdata subdirs structure
|
||||||
// But that's not implemented yet. For now, 'pg_wal' is included in the base backup tarball that
|
for dir in pg_constants::PGDATA_SUBDIRS.iter() {
|
||||||
// we receive from the Page Server, so we don't need to create the empty 'pg_wal' directory here.
|
let path = pgdata.as_path().join(*dir);
|
||||||
//fs::create_dir_all(pgdata.join("pg_wal"))?;
|
|
||||||
|
fs::create_dir_all(path.clone())?;
|
||||||
|
|
||||||
|
fs::set_permissions(path, fs::Permissions::from_mode(0o700)).with_context(|| {
|
||||||
|
format!(
|
||||||
|
"could not set permissions in data directory {}",
|
||||||
|
pgdata.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
let mut copyreader = client
|
let mut copyreader = client
|
||||||
.copy_out(sql.as_str())
|
.copy_out(sql.as_str())
|
||||||
@@ -322,7 +332,7 @@ impl PostgresNode {
|
|||||||
// Never clean up old WAL. TODO: We should use a replication
|
// Never clean up old WAL. TODO: We should use a replication
|
||||||
// slot or something proper, to prevent the compute node
|
// slot or something proper, to prevent the compute node
|
||||||
// from removing WAL that hasn't been streamed to the safekeepr or
|
// from removing WAL that hasn't been streamed to the safekeepr or
|
||||||
// page server yet. But this will do for now.
|
// page server yet. (gh issue #349)
|
||||||
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?;
|
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?;
|
||||||
|
|
||||||
// Connect it to the page server.
|
// Connect it to the page server.
|
||||||
|
|||||||
@@ -4,23 +4,22 @@
|
|||||||
//! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
|
//! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
|
||||||
//! It could use a better name.
|
//! It could use a better name.
|
||||||
//!
|
//!
|
||||||
//! Stateless Postgres compute node is launched by sending tarball which contains non-relational data (multixacts, clog, filenodemaps, twophase files)
|
//! Stateless Postgres compute node is launched by sending a tarball
|
||||||
//! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directory and
|
//! which contains non-relational data (multixacts, clog, filenodemaps, twophase files),
|
||||||
//! data stored in object storage.
|
//! generated pg_control and dummy segment of WAL.
|
||||||
|
//! This module is responsible for creation of such tarball
|
||||||
|
//! from data stored in object storage.
|
||||||
//!
|
//!
|
||||||
use crate::{PageServerConf, ZTenantId, ZTimelineId};
|
|
||||||
use bytes::{BufMut, BytesMut};
|
use bytes::{BufMut, BytesMut};
|
||||||
use log::*;
|
use log::*;
|
||||||
|
use std::io;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use tar::{Builder, Header};
|
use tar::{Builder, EntryType, Header};
|
||||||
use walkdir::WalkDir;
|
|
||||||
|
|
||||||
use crate::object_key::*;
|
use crate::object_key::{DatabaseTag, ObjectTag};
|
||||||
use crate::repository::Timeline;
|
use crate::repository::Timeline;
|
||||||
use postgres_ffi::relfile_utils::*;
|
|
||||||
use postgres_ffi::xlog_utils::*;
|
use postgres_ffi::xlog_utils::*;
|
||||||
use postgres_ffi::*;
|
use postgres_ffi::*;
|
||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
@@ -33,7 +32,6 @@ pub struct Basebackup<'a> {
|
|||||||
timeline: &'a Arc<dyn Timeline>,
|
timeline: &'a Arc<dyn Timeline>,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
prev_record_lsn: Lsn,
|
prev_record_lsn: Lsn,
|
||||||
snappath: PathBuf,
|
|
||||||
slru_buf: [u8; pg_constants::SLRU_SEG_SIZE],
|
slru_buf: [u8; pg_constants::SLRU_SEG_SIZE],
|
||||||
slru_segno: u32,
|
slru_segno: u32,
|
||||||
slru_path: &'static str,
|
slru_path: &'static str,
|
||||||
@@ -41,23 +39,16 @@ pub struct Basebackup<'a> {
|
|||||||
|
|
||||||
impl<'a> Basebackup<'a> {
|
impl<'a> Basebackup<'a> {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
conf: &PageServerConf,
|
|
||||||
write: &'a mut dyn Write,
|
write: &'a mut dyn Write,
|
||||||
tenantid: ZTenantId,
|
|
||||||
timelineid: ZTimelineId,
|
|
||||||
timeline: &'a Arc<dyn Timeline>,
|
timeline: &'a Arc<dyn Timeline>,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
prev_record_lsn: Lsn,
|
prev_record_lsn: Lsn,
|
||||||
snapshot_lsn: Lsn,
|
|
||||||
) -> Basebackup<'a> {
|
) -> Basebackup<'a> {
|
||||||
Basebackup {
|
Basebackup {
|
||||||
ar: Builder::new(write),
|
ar: Builder::new(write),
|
||||||
timeline,
|
timeline,
|
||||||
lsn,
|
lsn,
|
||||||
prev_record_lsn,
|
prev_record_lsn,
|
||||||
snappath: conf
|
|
||||||
.snapshots_path(&timelineid, &tenantid)
|
|
||||||
.join(format!("{:016X}", snapshot_lsn.0)),
|
|
||||||
slru_path: "",
|
slru_path: "",
|
||||||
slru_segno: u32::MAX,
|
slru_segno: u32::MAX,
|
||||||
slru_buf: [0u8; pg_constants::SLRU_SEG_SIZE],
|
slru_buf: [0u8; pg_constants::SLRU_SEG_SIZE],
|
||||||
@@ -65,47 +56,22 @@ impl<'a> Basebackup<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_tarball(&mut self) -> anyhow::Result<()> {
|
pub fn send_tarball(&mut self) -> anyhow::Result<()> {
|
||||||
debug!("sending tarball of snapshot in {}", self.snappath.display());
|
// Send empty config files.
|
||||||
for entry in WalkDir::new(&self.snappath) {
|
for filepath in pg_constants::PGDATA_SPECIAL_FILES.iter() {
|
||||||
let entry = entry?;
|
if *filepath == "pg_hba.conf" {
|
||||||
let fullpath = entry.path();
|
let data = pg_constants::PG_HBA.as_bytes();
|
||||||
let relpath = entry.path().strip_prefix(&self.snappath).unwrap();
|
let header = new_tar_header(&filepath, data.len() as u64)?;
|
||||||
|
self.ar.append(&header, &data[..])?;
|
||||||
if relpath.to_str().unwrap() == "" {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if entry.file_type().is_dir() {
|
|
||||||
trace!(
|
|
||||||
"sending dir {} as {}",
|
|
||||||
fullpath.display(),
|
|
||||||
relpath.display()
|
|
||||||
);
|
|
||||||
self.ar.append_dir(relpath, fullpath)?;
|
|
||||||
} else if entry.file_type().is_symlink() {
|
|
||||||
error!("ignoring symlink in snapshot dir");
|
|
||||||
} else if entry.file_type().is_file() {
|
|
||||||
if !is_rel_file_path(relpath.to_str().unwrap()) {
|
|
||||||
if entry.file_name() != "pg_filenode.map" // this files will be generated from object storage
|
|
||||||
&& !relpath.starts_with("pg_xact/")
|
|
||||||
&& !relpath.starts_with("pg_multixact/")
|
|
||||||
{
|
|
||||||
trace!("sending {}", relpath.display());
|
|
||||||
self.ar.append_path_with_name(fullpath, relpath)?;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// relation pages are loaded on demand and should not be included in tarball
|
|
||||||
trace!("not sending {}", relpath.display());
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
error!("unknown file type: {}", fullpath.display());
|
let header = new_tar_header(&filepath, 0)?;
|
||||||
|
self.ar.append(&header, &mut io::empty())?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate non-relational files.
|
// Gather non-relational files from object storage pages.
|
||||||
// Iteration is sorted order: all objects of the same time are grouped and traversed
|
// Iteration is sorted order: all objects of the same type are grouped and traversed
|
||||||
// in key ascending order. For example all pg_xact records precede pg_multixact records and are sorted by block number.
|
// in key ascending order. For example all pg_xact records precede pg_multixact records and are sorted by block number.
|
||||||
// It allows to easily construct SLRU segments (32 blocks).
|
// It allows to easily construct SLRU segments.
|
||||||
for obj in self.timeline.list_nonrels(self.lsn)? {
|
for obj in self.timeline.list_nonrels(self.lsn)? {
|
||||||
match obj {
|
match obj {
|
||||||
ObjectTag::Clog(slru) => self.add_slru_segment("pg_xact", &obj, slru.blknum)?,
|
ObjectTag::Clog(slru) => self.add_slru_segment("pg_xact", &obj, slru.blknum)?,
|
||||||
@@ -120,7 +86,10 @@ impl<'a> Basebackup<'a> {
|
|||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.finish_slru_segment()?; // write last non-completed SLRU segment (if any)
|
|
||||||
|
// write last non-completed SLRU segment (if any)
|
||||||
|
self.finish_slru_segment()?;
|
||||||
|
// Generate pg_control and bootstrap WAL segment.
|
||||||
self.add_pgcontrol_file()?;
|
self.add_pgcontrol_file()?;
|
||||||
self.ar.finish()?;
|
self.ar.finish()?;
|
||||||
debug!("all tarred up!");
|
debug!("all tarred up!");
|
||||||
@@ -129,19 +98,18 @@ impl<'a> Basebackup<'a> {
|
|||||||
|
|
||||||
//
|
//
|
||||||
// Generate SLRU segment files from repository. Path identifies SLRU kind (pg_xact, pg_multixact/members, ...).
|
// Generate SLRU segment files from repository. Path identifies SLRU kind (pg_xact, pg_multixact/members, ...).
|
||||||
// Intially pass an empty string.
|
|
||||||
//
|
//
|
||||||
fn add_slru_segment(
|
fn add_slru_segment(
|
||||||
&mut self,
|
&mut self,
|
||||||
path: &'static str,
|
path: &'static str,
|
||||||
tag: &ObjectTag,
|
tag: &ObjectTag,
|
||||||
page: u32,
|
blknum: u32,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
|
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
|
||||||
// Zero length image indicates truncated segment: just skip it
|
// Zero length image indicates truncated segment: just skip it
|
||||||
if !img.is_empty() {
|
if !img.is_empty() {
|
||||||
assert!(img.len() == pg_constants::BLCKSZ as usize);
|
assert!(img.len() == pg_constants::BLCKSZ as usize);
|
||||||
let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
let segno = blknum / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||||
if self.slru_path != "" && (self.slru_segno != segno || self.slru_path != path) {
|
if self.slru_path != "" && (self.slru_segno != segno || self.slru_path != path) {
|
||||||
// Switch to new segment: save old one
|
// Switch to new segment: save old one
|
||||||
let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno);
|
let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno);
|
||||||
@@ -151,7 +119,7 @@ impl<'a> Basebackup<'a> {
|
|||||||
}
|
}
|
||||||
self.slru_segno = segno;
|
self.slru_segno = segno;
|
||||||
self.slru_path = path;
|
self.slru_path = path;
|
||||||
let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize
|
let offs_start = (blknum % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize
|
||||||
* pg_constants::BLCKSZ as usize;
|
* pg_constants::BLCKSZ as usize;
|
||||||
let offs_end = offs_start + pg_constants::BLCKSZ as usize;
|
let offs_end = offs_start + pg_constants::BLCKSZ as usize;
|
||||||
self.slru_buf[offs_start..offs_end].copy_from_slice(&img);
|
self.slru_buf[offs_start..offs_end].copy_from_slice(&img);
|
||||||
@@ -175,18 +143,36 @@ impl<'a> Basebackup<'a> {
|
|||||||
|
|
||||||
//
|
//
|
||||||
// Extract pg_filenode.map files from repository
|
// Extract pg_filenode.map files from repository
|
||||||
|
// Along with them also send PG_VERSION for each database.
|
||||||
//
|
//
|
||||||
fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> {
|
fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> {
|
||||||
|
trace!("add_relmap_file {:?}", db);
|
||||||
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
|
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
|
||||||
info!("add_relmap_file {:?}", db);
|
|
||||||
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
|
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
|
||||||
|
let dst_path = "PG_VERSION";
|
||||||
|
let version_bytes = pg_constants::PG_MAJORVERSION.as_bytes();
|
||||||
|
let header = new_tar_header(&dst_path, version_bytes.len() as u64)?;
|
||||||
|
self.ar.append(&header, &version_bytes[..])?;
|
||||||
|
|
||||||
|
let dst_path = format!("global/PG_VERSION");
|
||||||
|
let header = new_tar_header(&dst_path, version_bytes.len() as u64)?;
|
||||||
|
self.ar.append(&header, &version_bytes[..])?;
|
||||||
|
|
||||||
String::from("global/pg_filenode.map") // filenode map for global tablespace
|
String::from("global/pg_filenode.map") // filenode map for global tablespace
|
||||||
} else {
|
} else {
|
||||||
// User defined tablespaces are not supported
|
// User defined tablespaces are not supported
|
||||||
assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID);
|
assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID);
|
||||||
let src_path = self.snappath.join("base/1/PG_VERSION");
|
|
||||||
|
// Append dir path for each database
|
||||||
|
let path = format!("base/{}", db.dbnode);
|
||||||
|
let header = new_tar_header_dir(&path)?;
|
||||||
|
self.ar.append(&header, &mut io::empty())?;
|
||||||
|
|
||||||
let dst_path = format!("base/{}/PG_VERSION", db.dbnode);
|
let dst_path = format!("base/{}/PG_VERSION", db.dbnode);
|
||||||
self.ar.append_path_with_name(&src_path, &dst_path)?;
|
let version_bytes = pg_constants::PG_MAJORVERSION.as_bytes();
|
||||||
|
let header = new_tar_header(&dst_path, version_bytes.len() as u64)?;
|
||||||
|
self.ar.append(&header, &version_bytes[..])?;
|
||||||
|
|
||||||
format!("base/{}/pg_filenode.map", db.dbnode)
|
format!("base/{}/pg_filenode.map", db.dbnode)
|
||||||
};
|
};
|
||||||
assert!(img.len() == 512);
|
assert!(img.len() == 512);
|
||||||
@@ -216,7 +202,8 @@ impl<'a> Basebackup<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Add generated pg_control file
|
// Add generated pg_control file and bootstrap WAL segment.
|
||||||
|
// Also send zenith.signal file with extra bootstrap data.
|
||||||
//
|
//
|
||||||
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
|
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
|
||||||
let checkpoint_bytes = self
|
let checkpoint_bytes = self
|
||||||
@@ -238,12 +225,13 @@ impl<'a> Basebackup<'a> {
|
|||||||
checkpoint.redo = self.lsn.0 + self.lsn.calc_padding(8u32);
|
checkpoint.redo = self.lsn.0 + self.lsn.calc_padding(8u32);
|
||||||
|
|
||||||
//reset some fields we don't want to preserve
|
//reset some fields we don't want to preserve
|
||||||
|
//TODO Check this.
|
||||||
|
//We may need to determine the value from twophase data.
|
||||||
checkpoint.oldestActiveXid = 0;
|
checkpoint.oldestActiveXid = 0;
|
||||||
|
|
||||||
//save new values in pg_control
|
//save new values in pg_control
|
||||||
pg_control.checkPoint = checkpoint_lsn;
|
pg_control.checkPoint = checkpoint_lsn;
|
||||||
pg_control.checkPointCopy = checkpoint;
|
pg_control.checkPointCopy = checkpoint;
|
||||||
info!("pg_control.state = {}", pg_control.state);
|
|
||||||
pg_control.state = pg_constants::DB_SHUTDOWNED;
|
pg_control.state = pg_constants::DB_SHUTDOWNED;
|
||||||
|
|
||||||
// add zenith.signal file
|
// add zenith.signal file
|
||||||
@@ -271,60 +259,6 @@ impl<'a> Basebackup<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
|
||||||
/// Parse a path, relative to the root of PostgreSQL data directory, as
|
|
||||||
/// a PostgreSQL relation data file.
|
|
||||||
///
|
|
||||||
fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> {
|
|
||||||
/*
|
|
||||||
* Relation data files can be in one of the following directories:
|
|
||||||
*
|
|
||||||
* global/
|
|
||||||
* shared relations
|
|
||||||
*
|
|
||||||
* base/<db oid>/
|
|
||||||
* regular relations, default tablespace
|
|
||||||
*
|
|
||||||
* pg_tblspc/<tblspc oid>/<tblspc version>/
|
|
||||||
* within a non-default tablespace (the name of the directory
|
|
||||||
* depends on version)
|
|
||||||
*
|
|
||||||
* And the relation data files themselves have a filename like:
|
|
||||||
*
|
|
||||||
* <oid>.<segment number>
|
|
||||||
*/
|
|
||||||
if let Some(fname) = path.strip_prefix("global/") {
|
|
||||||
let (_relnode, _forknum, _segno) = parse_relfilename(fname)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
} else if let Some(dbpath) = path.strip_prefix("base/") {
|
|
||||||
let mut s = dbpath.split('/');
|
|
||||||
let dbnode_str = s.next().ok_or(FilePathError::InvalidFileName)?;
|
|
||||||
let _dbnode = dbnode_str.parse::<u32>()?;
|
|
||||||
let fname = s.next().ok_or(FilePathError::InvalidFileName)?;
|
|
||||||
if s.next().is_some() {
|
|
||||||
return Err(FilePathError::InvalidFileName);
|
|
||||||
};
|
|
||||||
|
|
||||||
let (_relnode, _forknum, _segno) = parse_relfilename(fname)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
} else if path.strip_prefix("pg_tblspc/").is_some() {
|
|
||||||
// TODO
|
|
||||||
error!("tablespaces not implemented yet");
|
|
||||||
Err(FilePathError::InvalidFileName)
|
|
||||||
} else {
|
|
||||||
Err(FilePathError::InvalidFileName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Check if it is relational file
|
|
||||||
//
|
|
||||||
fn is_rel_file_path(path: &str) -> bool {
|
|
||||||
parse_rel_file_path(path).is_ok()
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// Create new tarball entry header
|
// Create new tarball entry header
|
||||||
//
|
//
|
||||||
@@ -343,3 +277,20 @@ fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
|
|||||||
header.set_cksum();
|
header.set_cksum();
|
||||||
Ok(header)
|
Ok(header)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn new_tar_header_dir(path: &str) -> anyhow::Result<Header> {
|
||||||
|
let mut header = Header::new_gnu();
|
||||||
|
header.set_size(0);
|
||||||
|
header.set_path(path)?;
|
||||||
|
header.set_mode(0o755); // -rw-------
|
||||||
|
header.set_entry_type(EntryType::dir());
|
||||||
|
header.set_mtime(
|
||||||
|
// use currenttime as last modified time
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_secs(),
|
||||||
|
);
|
||||||
|
header.set_cksum();
|
||||||
|
Ok(header)
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,21 +5,19 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
use anyhow::{bail, ensure, Context, Result};
|
use anyhow::{bail, ensure, Context, Result};
|
||||||
use fs::File;
|
use postgres_ffi::ControlFileData;
|
||||||
use postgres_ffi::{pg_constants, xlog_utils, ControlFileData};
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::io::Read;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::{
|
use std::{
|
||||||
fs,
|
fs,
|
||||||
path::{Path, PathBuf},
|
path::Path,
|
||||||
process::{Command, Stdio},
|
process::{Command, Stdio},
|
||||||
str::FromStr,
|
str::FromStr,
|
||||||
|
sync::Arc,
|
||||||
};
|
};
|
||||||
use zenith_utils::lsn::Lsn;
|
|
||||||
|
|
||||||
use log::*;
|
use log::*;
|
||||||
|
use zenith_utils::lsn::Lsn;
|
||||||
|
|
||||||
use crate::logger;
|
use crate::logger;
|
||||||
use crate::object_repository::ObjectRepository;
|
use crate::object_repository::ObjectRepository;
|
||||||
@@ -65,6 +63,7 @@ pub fn init_pageserver(
|
|||||||
.with_context(|| "failed to create repo")?;
|
.with_context(|| "failed to create repo")?;
|
||||||
}
|
}
|
||||||
fs::create_dir_all(conf.tenants_path())?;
|
fs::create_dir_all(conf.tenants_path())?;
|
||||||
|
|
||||||
println!("pageserver init succeeded");
|
println!("pageserver init succeeded");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -92,43 +91,7 @@ pub fn create_repo(
|
|||||||
|
|
||||||
info!("created directory structure in {}", repo_dir.display());
|
info!("created directory structure in {}", repo_dir.display());
|
||||||
|
|
||||||
// Run initdb
|
|
||||||
//
|
|
||||||
// We create the cluster temporarily in a "tmp" directory inside the repository,
|
|
||||||
// and move it to the right location from there.
|
|
||||||
let tmppath = conf.tenant_path(&tenantid).join("tmp");
|
|
||||||
|
|
||||||
info!("running initdb... ");
|
|
||||||
|
|
||||||
let initdb_path = conf.pg_bin_dir().join("initdb");
|
|
||||||
let initdb_otput = Command::new(initdb_path)
|
|
||||||
.args(&["-D", tmppath.to_str().unwrap()])
|
|
||||||
.args(&["-U", &conf.superuser])
|
|
||||||
.arg("--no-instructions")
|
|
||||||
.env_clear()
|
|
||||||
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
|
|
||||||
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
|
|
||||||
.stdout(Stdio::null())
|
|
||||||
.output()
|
|
||||||
.with_context(|| "failed to execute initdb")?;
|
|
||||||
if !initdb_otput.status.success() {
|
|
||||||
anyhow::bail!(
|
|
||||||
"initdb failed: '{}'",
|
|
||||||
String::from_utf8_lossy(&initdb_otput.stderr)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
info!("initdb succeeded");
|
|
||||||
|
|
||||||
// Read control file to extract the LSN and system id
|
|
||||||
let controlfile_path = tmppath.join("global").join("pg_control");
|
|
||||||
let controlfile = ControlFileData::decode(&fs::read(controlfile_path)?)?;
|
|
||||||
// let systemid = controlfile.system_identifier;
|
|
||||||
let lsn = controlfile.checkPoint;
|
|
||||||
let lsnstr = format!("{:016X}", lsn);
|
|
||||||
|
|
||||||
// Bootstrap the repository by loading the newly-initdb'd cluster into 'main' branch.
|
|
||||||
let tli = create_timeline(conf, None, &tenantid)?;
|
let tli = create_timeline(conf, None, &tenantid)?;
|
||||||
let timelinedir = conf.timeline_path(&tli, &tenantid);
|
|
||||||
|
|
||||||
// We don't use page_cache here, because we don't want to spawn the WAL redo thread during
|
// We don't use page_cache here, because we don't want to spawn the WAL redo thread during
|
||||||
// repository initialization.
|
// repository initialization.
|
||||||
@@ -147,38 +110,93 @@ pub fn create_repo(
|
|||||||
wal_redo_manager,
|
wal_redo_manager,
|
||||||
tenantid,
|
tenantid,
|
||||||
);
|
);
|
||||||
let timeline = repo.create_empty_timeline(tli, Lsn(lsn))?;
|
|
||||||
|
|
||||||
restore_local_repo::import_timeline_from_postgres_datadir(&tmppath, &*timeline, Lsn(lsn))?;
|
// Load data into pageserver
|
||||||
|
// TODO To implement zenith import we need to
|
||||||
|
// move data loading out of create_repo()
|
||||||
|
bootstrap_timeline(conf, tenantid, tli, &repo)?;
|
||||||
|
|
||||||
// Move the initial WAL file
|
Ok(repo)
|
||||||
fs::rename(
|
}
|
||||||
tmppath.join("pg_wal").join("000000010000000000000001"),
|
|
||||||
timelinedir
|
// Returns checkpoint LSN from controlfile
|
||||||
.join("wal")
|
fn get_lsn_from_controlfile(path: &Path) -> Result<Lsn> {
|
||||||
.join("000000010000000000000001.partial"),
|
// Read control file to extract the LSN
|
||||||
)?;
|
let controlfile_path = path.join("global").join("pg_control");
|
||||||
info!("created initial timeline {}", tli);
|
let controlfile = ControlFileData::decode(&fs::read(controlfile_path)?)?;
|
||||||
|
let lsn = controlfile.checkPoint;
|
||||||
|
|
||||||
|
Ok(Lsn(lsn))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the cluster temporarily in a initdbpath directory inside the repository
|
||||||
|
// to get bootstrap data for timeline initialization.
|
||||||
|
//
|
||||||
|
fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
|
||||||
|
info!("running initdb... ");
|
||||||
|
|
||||||
|
let initdb_path = conf.pg_bin_dir().join("initdb");
|
||||||
|
let initdb_otput = Command::new(initdb_path)
|
||||||
|
.args(&["-D", initdbpath.to_str().unwrap()])
|
||||||
|
.args(&["-U", &conf.superuser])
|
||||||
|
.arg("--no-instructions")
|
||||||
|
.env_clear()
|
||||||
|
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
|
||||||
|
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
|
||||||
|
.stdout(Stdio::null())
|
||||||
|
.output()
|
||||||
|
.with_context(|| "failed to execute initdb")?;
|
||||||
|
if !initdb_otput.status.success() {
|
||||||
|
anyhow::bail!(
|
||||||
|
"initdb failed: '{}'",
|
||||||
|
String::from_utf8_lossy(&initdb_otput.stderr)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
info!("initdb succeeded");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// - run initdb to init temporary instance and get bootstrap data
|
||||||
|
// - after initialization complete, remove the temp dir.
|
||||||
|
//
|
||||||
|
fn bootstrap_timeline(
|
||||||
|
conf: &'static PageServerConf,
|
||||||
|
tenantid: ZTenantId,
|
||||||
|
tli: ZTimelineId,
|
||||||
|
repo: &dyn Repository,
|
||||||
|
) -> Result<()> {
|
||||||
|
let initdb_path = conf.tenant_path(&tenantid).join("tmp");
|
||||||
|
|
||||||
|
// Init temporarily repo to get bootstrap data
|
||||||
|
run_initdb(conf, &initdb_path)?;
|
||||||
|
let pgdata_path = initdb_path;
|
||||||
|
|
||||||
|
let lsn = get_lsn_from_controlfile(&pgdata_path)?;
|
||||||
|
|
||||||
|
info!("bootstrap_timeline {:?} at lsn {}", pgdata_path, lsn);
|
||||||
|
|
||||||
|
let timeline = repo.create_empty_timeline(tli, lsn)?;
|
||||||
|
restore_local_repo::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?;
|
||||||
|
|
||||||
|
let wal_dir = pgdata_path.join("pg_wal");
|
||||||
|
restore_local_repo::import_timeline_wal(&wal_dir, &*timeline, timeline.get_last_record_lsn())?;
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"created initial timeline {} timeline.lsn {}",
|
||||||
|
tli,
|
||||||
|
timeline.get_last_record_lsn()
|
||||||
|
);
|
||||||
|
|
||||||
let data = tli.to_string();
|
let data = tli.to_string();
|
||||||
fs::write(conf.branch_path("main", &tenantid), data)?;
|
fs::write(conf.branch_path("main", &tenantid), data)?;
|
||||||
info!("created main branch");
|
println!("created main branch");
|
||||||
|
|
||||||
// Remove pg_wal
|
// Remove temp dir. We don't need it anymore
|
||||||
fs::remove_dir_all(tmppath.join("pg_wal"))?;
|
fs::remove_dir_all(pgdata_path)?;
|
||||||
|
|
||||||
// Move the data directory as an initial base backup.
|
Ok(())
|
||||||
// FIXME: It would be enough to only copy the non-relation files here, the relation
|
|
||||||
// data was already loaded into the repository.
|
|
||||||
let target = timelinedir.join("snapshots").join(&lsnstr);
|
|
||||||
fs::rename(tmppath, &target)?;
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"new zenith repository was created in {}",
|
|
||||||
repo_dir.display()
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(repo)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn get_tenants(conf: &PageServerConf) -> Result<Vec<String>> {
|
pub(crate) fn get_tenants(conf: &PageServerConf) -> Result<Vec<String>> {
|
||||||
@@ -269,26 +287,10 @@ pub(crate) fn create_branch(
|
|||||||
|
|
||||||
// create a new timeline directory for it
|
// create a new timeline directory for it
|
||||||
let newtli = create_timeline(conf, Some(startpoint), tenantid)?;
|
let newtli = create_timeline(conf, Some(startpoint), tenantid)?;
|
||||||
let newtimelinedir = conf.timeline_path(&newtli, tenantid);
|
|
||||||
|
|
||||||
// Let the Repository backend do its initialization
|
// Let the Repository backend do its initialization
|
||||||
repo.branch_timeline(startpoint.timelineid, newtli, startpoint.lsn)?;
|
repo.branch_timeline(startpoint.timelineid, newtli, startpoint.lsn)?;
|
||||||
|
|
||||||
// Copy the latest snapshot (TODO: before the startpoint) and all WAL
|
|
||||||
// TODO: be smarter and avoid the copying...
|
|
||||||
let (_maxsnapshot, oldsnapshotdir) =
|
|
||||||
find_latest_snapshot(conf, &startpoint.timelineid, tenantid)?;
|
|
||||||
let copy_opts = fs_extra::dir::CopyOptions::new();
|
|
||||||
fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), ©_opts)?;
|
|
||||||
|
|
||||||
let oldtimelinedir = conf.timeline_path(&startpoint.timelineid, tenantid);
|
|
||||||
copy_wal(
|
|
||||||
&oldtimelinedir.join("wal"),
|
|
||||||
&newtimelinedir.join("wal"),
|
|
||||||
startpoint.lsn,
|
|
||||||
pg_constants::WAL_SEGMENT_SIZE,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// Remember the human-readable branch name for the new timeline.
|
// Remember the human-readable branch name for the new timeline.
|
||||||
// FIXME: there's a race condition, if you create a branch with the same
|
// FIXME: there's a race condition, if you create a branch with the same
|
||||||
// name concurrently.
|
// name concurrently.
|
||||||
@@ -389,7 +391,6 @@ fn create_timeline(
|
|||||||
let timelinedir = conf.timeline_path(&timelineid, tenantid);
|
let timelinedir = conf.timeline_path(&timelineid, tenantid);
|
||||||
|
|
||||||
fs::create_dir(&timelinedir)?;
|
fs::create_dir(&timelinedir)?;
|
||||||
fs::create_dir(&timelinedir.join("snapshots"))?;
|
|
||||||
fs::create_dir(&timelinedir.join("wal"))?;
|
fs::create_dir(&timelinedir.join("wal"))?;
|
||||||
|
|
||||||
if let Some(ancestor) = ancestor {
|
if let Some(ancestor) = ancestor {
|
||||||
@@ -399,76 +400,3 @@ fn create_timeline(
|
|||||||
|
|
||||||
Ok(timelineid)
|
Ok(timelineid)
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
|
||||||
/// Copy all WAL segments from one directory to another, up to given LSN.
|
|
||||||
///
|
|
||||||
/// If the given LSN is in the middle of a segment, the last segment containing it
|
|
||||||
/// is written out as .partial, and padded with zeros.
|
|
||||||
///
|
|
||||||
fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: usize) -> Result<()> {
|
|
||||||
let last_segno = upto.segment_number(wal_seg_size);
|
|
||||||
let last_segoff = upto.segment_offset(wal_seg_size);
|
|
||||||
|
|
||||||
for entry in fs::read_dir(src_dir).unwrap().flatten() {
|
|
||||||
let entry_name = entry.file_name();
|
|
||||||
let fname = entry_name.to_str().unwrap();
|
|
||||||
|
|
||||||
// Check if the filename looks like an xlog file, or a .partial file.
|
|
||||||
if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize);
|
|
||||||
|
|
||||||
let copylen;
|
|
||||||
let mut dst_fname = PathBuf::from(fname);
|
|
||||||
if segno > last_segno {
|
|
||||||
// future segment, skip
|
|
||||||
continue;
|
|
||||||
} else if segno < last_segno {
|
|
||||||
copylen = wal_seg_size;
|
|
||||||
dst_fname.set_extension("");
|
|
||||||
} else {
|
|
||||||
copylen = last_segoff;
|
|
||||||
dst_fname.set_extension("partial");
|
|
||||||
}
|
|
||||||
|
|
||||||
let src_file = File::open(entry.path())?;
|
|
||||||
let mut dst_file = File::create(dst_dir.join(&dst_fname))?;
|
|
||||||
std::io::copy(&mut src_file.take(copylen as u64), &mut dst_file)?;
|
|
||||||
|
|
||||||
if copylen < wal_seg_size {
|
|
||||||
std::io::copy(
|
|
||||||
&mut std::io::repeat(0).take((wal_seg_size - copylen) as u64),
|
|
||||||
&mut dst_file,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the latest snapshot for a timeline
|
|
||||||
fn find_latest_snapshot(
|
|
||||||
conf: &PageServerConf,
|
|
||||||
timelineid: &ZTimelineId,
|
|
||||||
tenantid: &ZTenantId,
|
|
||||||
) -> Result<(Lsn, PathBuf)> {
|
|
||||||
let snapshotsdir = conf.snapshots_path(timelineid, tenantid);
|
|
||||||
let paths = fs::read_dir(&snapshotsdir)?;
|
|
||||||
let mut maxsnapshot = Lsn(0);
|
|
||||||
let mut snapshotdir: Option<PathBuf> = None;
|
|
||||||
for path in paths {
|
|
||||||
let path = path?;
|
|
||||||
let filename = path.file_name().to_str().unwrap().to_owned();
|
|
||||||
if let Ok(lsn) = Lsn::from_hex(&filename) {
|
|
||||||
maxsnapshot = std::cmp::max(lsn, maxsnapshot);
|
|
||||||
snapshotdir = Some(path.path());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if maxsnapshot == Lsn(0) {
|
|
||||||
// TODO: check ancestor timeline
|
|
||||||
anyhow::bail!("no snapshot found in {}", snapshotsdir.display());
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((maxsnapshot, snapshotdir.unwrap()))
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -78,10 +78,6 @@ impl PageServerConf {
|
|||||||
self.timelines_path(tenantid).join(timelineid.to_string())
|
self.timelines_path(tenantid).join(timelineid.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn snapshots_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf {
|
|
||||||
self.timeline_path(timelineid, tenantid).join("snapshots")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn ancestor_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf {
|
fn ancestor_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf {
|
||||||
self.timeline_path(timelineid, tenantid).join("ancestor")
|
self.timeline_path(timelineid, tenantid).join("ancestor")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -145,7 +145,8 @@ impl Repository for ObjectRepository {
|
|||||||
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> {
|
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> {
|
||||||
let src_timeline = self.get_timeline(src)?;
|
let src_timeline = self.get_timeline(src)?;
|
||||||
|
|
||||||
// Write a metadata key, noting the ancestor of th new timeline. There is initially
|
trace!("branch_timeline at lsn {}", at_lsn);
|
||||||
|
// Write a metadata key, noting the ancestor of the new timeline. There is initially
|
||||||
// no data in it, but all the read-calls know to look into the ancestor.
|
// no data in it, but all the read-calls know to look into the ancestor.
|
||||||
let metadata = MetadataEntry {
|
let metadata = MetadataEntry {
|
||||||
last_valid_lsn: at_lsn,
|
last_valid_lsn: at_lsn,
|
||||||
|
|||||||
@@ -32,7 +32,6 @@ use crate::branches;
|
|||||||
use crate::object_key::ObjectTag;
|
use crate::object_key::ObjectTag;
|
||||||
use crate::page_cache;
|
use crate::page_cache;
|
||||||
use crate::repository::{BufferTag, Modification, RelTag};
|
use crate::repository::{BufferTag, Modification, RelTag};
|
||||||
use crate::restore_local_repo;
|
|
||||||
use crate::walreceiver;
|
use crate::walreceiver;
|
||||||
use crate::walredo::PostgresRedoManager;
|
use crate::walredo::PostgresRedoManager;
|
||||||
use crate::PageServerConf;
|
use crate::PageServerConf;
|
||||||
@@ -295,23 +294,15 @@ impl PageServerHandler {
|
|||||||
|
|
||||||
/* Send a tarball of the latest snapshot on the timeline */
|
/* Send a tarball of the latest snapshot on the timeline */
|
||||||
|
|
||||||
// find latest snapshot
|
|
||||||
let snapshot_lsn =
|
|
||||||
restore_local_repo::find_latest_snapshot(&self.conf, &timelineid, &tenantid).unwrap();
|
|
||||||
|
|
||||||
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
|
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut writer = CopyDataSink { pgb };
|
let mut writer = CopyDataSink { pgb };
|
||||||
let mut basebackup = basebackup::Basebackup::new(
|
let mut basebackup = basebackup::Basebackup::new(
|
||||||
self.conf,
|
|
||||||
&mut writer,
|
&mut writer,
|
||||||
tenantid,
|
|
||||||
timelineid,
|
|
||||||
&timeline,
|
&timeline,
|
||||||
req_lsn,
|
req_lsn,
|
||||||
timeline.get_prev_record_lsn(),
|
timeline.get_prev_record_lsn(),
|
||||||
snapshot_lsn,
|
|
||||||
);
|
);
|
||||||
basebackup.send_tarball()?;
|
basebackup.send_tarball()?;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
//! zenith Timeline.
|
//! zenith Timeline.
|
||||||
//!
|
//!
|
||||||
use log::*;
|
use log::*;
|
||||||
use std::cmp::{max, min};
|
use std::cmp::min;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
@@ -17,9 +17,6 @@ use bytes::{Buf, Bytes};
|
|||||||
use crate::object_key::*;
|
use crate::object_key::*;
|
||||||
use crate::repository::*;
|
use crate::repository::*;
|
||||||
use crate::waldecoder::*;
|
use crate::waldecoder::*;
|
||||||
use crate::PageServerConf;
|
|
||||||
use crate::ZTenantId;
|
|
||||||
use crate::ZTimelineId;
|
|
||||||
use postgres_ffi::relfile_utils::*;
|
use postgres_ffi::relfile_utils::*;
|
||||||
use postgres_ffi::xlog_utils::*;
|
use postgres_ffi::xlog_utils::*;
|
||||||
use postgres_ffi::{pg_constants, CheckPoint, ControlFileData};
|
use postgres_ffi::{pg_constants, CheckPoint, ControlFileData};
|
||||||
@@ -28,36 +25,6 @@ use zenith_utils::lsn::Lsn;
|
|||||||
const MAX_MBR_BLKNO: u32 =
|
const MAX_MBR_BLKNO: u32 =
|
||||||
pg_constants::MAX_MULTIXACT_ID / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
pg_constants::MAX_MULTIXACT_ID / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
||||||
|
|
||||||
///
|
|
||||||
/// Find latest snapshot in a timeline's 'snapshots' directory
|
|
||||||
///
|
|
||||||
pub fn find_latest_snapshot(
|
|
||||||
conf: &PageServerConf,
|
|
||||||
timelineid: &ZTimelineId,
|
|
||||||
tenantid: &ZTenantId,
|
|
||||||
) -> Result<Lsn> {
|
|
||||||
let snapshotspath = conf.snapshots_path(timelineid, tenantid);
|
|
||||||
let mut last_snapshot_lsn = Lsn(0);
|
|
||||||
for direntry in fs::read_dir(&snapshotspath).unwrap() {
|
|
||||||
let filename = direntry.unwrap().file_name();
|
|
||||||
|
|
||||||
if let Ok(lsn) = Lsn::from_filename(&filename) {
|
|
||||||
last_snapshot_lsn = max(lsn, last_snapshot_lsn);
|
|
||||||
} else {
|
|
||||||
error!("unrecognized file in snapshots directory: {:?}", filename);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if last_snapshot_lsn == Lsn(0) {
|
|
||||||
error!(
|
|
||||||
"could not find valid snapshot in {}",
|
|
||||||
snapshotspath.display()
|
|
||||||
);
|
|
||||||
// TODO return error?
|
|
||||||
}
|
|
||||||
Ok(last_snapshot_lsn)
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Import all relation data pages from local disk into the repository.
|
/// Import all relation data pages from local disk into the repository.
|
||||||
///
|
///
|
||||||
@@ -108,6 +75,11 @@ pub fn import_timeline_from_postgres_datadir(
|
|||||||
for direntry in fs::read_dir(path.join("base"))? {
|
for direntry in fs::read_dir(path.join("base"))? {
|
||||||
let direntry = direntry?;
|
let direntry = direntry?;
|
||||||
|
|
||||||
|
//skip all temporary files
|
||||||
|
if direntry.file_name().to_str().unwrap() == "pgsql_tmp" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let dboid = direntry.file_name().to_str().unwrap().parse::<u32>()?;
|
let dboid = direntry.file_name().to_str().unwrap().parse::<u32>()?;
|
||||||
|
|
||||||
for direntry in fs::read_dir(direntry.path())? {
|
for direntry in fs::read_dir(direntry.path())? {
|
||||||
@@ -296,8 +268,8 @@ fn import_slru_file(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Scan PostgreSQL WAL files in given directory, and load all records >= 'startpoint' into
|
/// Scan PostgreSQL WAL files in given directory
|
||||||
/// the repository.
|
/// and load all records >= 'startpoint' into the repository.
|
||||||
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
|
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
|
||||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||||
|
|
||||||
@@ -311,6 +283,9 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
|
|||||||
loop {
|
loop {
|
||||||
// FIXME: assume postgresql tli 1 for now
|
// FIXME: assume postgresql tli 1 for now
|
||||||
let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
|
let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
|
||||||
|
//Read local file
|
||||||
let mut path = walpath.join(&filename);
|
let mut path = walpath.join(&filename);
|
||||||
|
|
||||||
// It could be as .partial
|
// It could be as .partial
|
||||||
@@ -331,12 +306,12 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
|
|||||||
file.seek(SeekFrom::Start(offset as u64))?;
|
file.seek(SeekFrom::Start(offset as u64))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut buf = Vec::new();
|
|
||||||
let nread = file.read_to_end(&mut buf)?;
|
let nread = file.read_to_end(&mut buf)?;
|
||||||
if nread != pg_constants::WAL_SEGMENT_SIZE - offset as usize {
|
if nread != pg_constants::WAL_SEGMENT_SIZE - offset as usize {
|
||||||
// Maybe allow this for .partial files?
|
// Maybe allow this for .partial files?
|
||||||
error!("read only {} bytes from WAL file", nread);
|
error!("read only {} bytes from WAL file", nread);
|
||||||
}
|
}
|
||||||
|
|
||||||
waldecoder.feed_bytes(&buf);
|
waldecoder.feed_bytes(&buf);
|
||||||
|
|
||||||
let mut nrecords = 0;
|
let mut nrecords = 0;
|
||||||
@@ -358,19 +333,19 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
|
|||||||
nrecords += 1;
|
nrecords += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
info!("imported {} records up to {}", nrecords, last_lsn);
|
||||||
"imported {} records from WAL file {} up to {}",
|
|
||||||
nrecords,
|
|
||||||
path.display(),
|
|
||||||
last_lsn
|
|
||||||
);
|
|
||||||
|
|
||||||
segno += 1;
|
segno += 1;
|
||||||
offset = 0;
|
offset = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("reached end of WAL at {}", last_lsn);
|
info!("reached end of WAL at {}", last_lsn);
|
||||||
let checkpoint_bytes = checkpoint.encode();
|
let checkpoint_bytes = checkpoint.encode();
|
||||||
timeline.put_page_image(ObjectTag::Checkpoint, last_lsn, checkpoint_bytes, false)?;
|
timeline.put_page_image(ObjectTag::Checkpoint, last_lsn, checkpoint_bytes, false)?;
|
||||||
|
|
||||||
|
timeline.advance_last_valid_lsn(last_lsn);
|
||||||
|
timeline.checkpoint()?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
98
postgres_ffi/samples/pg_hba.conf
Normal file
98
postgres_ffi/samples/pg_hba.conf
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
# PostgreSQL Client Authentication Configuration File
|
||||||
|
# ===================================================
|
||||||
|
#
|
||||||
|
# Refer to the "Client Authentication" section in the PostgreSQL
|
||||||
|
# documentation for a complete description of this file. A short
|
||||||
|
# synopsis follows.
|
||||||
|
#
|
||||||
|
# This file controls: which hosts are allowed to connect, how clients
|
||||||
|
# are authenticated, which PostgreSQL user names they can use, which
|
||||||
|
# databases they can access. Records take one of these forms:
|
||||||
|
#
|
||||||
|
# local DATABASE USER METHOD [OPTIONS]
|
||||||
|
# host DATABASE USER ADDRESS METHOD [OPTIONS]
|
||||||
|
# hostssl DATABASE USER ADDRESS METHOD [OPTIONS]
|
||||||
|
# hostnossl DATABASE USER ADDRESS METHOD [OPTIONS]
|
||||||
|
# hostgssenc DATABASE USER ADDRESS METHOD [OPTIONS]
|
||||||
|
# hostnogssenc DATABASE USER ADDRESS METHOD [OPTIONS]
|
||||||
|
#
|
||||||
|
# (The uppercase items must be replaced by actual values.)
|
||||||
|
#
|
||||||
|
# The first field is the connection type:
|
||||||
|
# - "local" is a Unix-domain socket
|
||||||
|
# - "host" is a TCP/IP socket (encrypted or not)
|
||||||
|
# - "hostssl" is a TCP/IP socket that is SSL-encrypted
|
||||||
|
# - "hostnossl" is a TCP/IP socket that is not SSL-encrypted
|
||||||
|
# - "hostgssenc" is a TCP/IP socket that is GSSAPI-encrypted
|
||||||
|
# - "hostnogssenc" is a TCP/IP socket that is not GSSAPI-encrypted
|
||||||
|
#
|
||||||
|
# DATABASE can be "all", "sameuser", "samerole", "replication", a
|
||||||
|
# database name, or a comma-separated list thereof. The "all"
|
||||||
|
# keyword does not match "replication". Access to replication
|
||||||
|
# must be enabled in a separate record (see example below).
|
||||||
|
#
|
||||||
|
# USER can be "all", a user name, a group name prefixed with "+", or a
|
||||||
|
# comma-separated list thereof. In both the DATABASE and USER fields
|
||||||
|
# you can also write a file name prefixed with "@" to include names
|
||||||
|
# from a separate file.
|
||||||
|
#
|
||||||
|
# ADDRESS specifies the set of hosts the record matches. It can be a
|
||||||
|
# host name, or it is made up of an IP address and a CIDR mask that is
|
||||||
|
# an integer (between 0 and 32 (IPv4) or 128 (IPv6) inclusive) that
|
||||||
|
# specifies the number of significant bits in the mask. A host name
|
||||||
|
# that starts with a dot (.) matches a suffix of the actual host name.
|
||||||
|
# Alternatively, you can write an IP address and netmask in separate
|
||||||
|
# columns to specify the set of hosts. Instead of a CIDR-address, you
|
||||||
|
# can write "samehost" to match any of the server's own IP addresses,
|
||||||
|
# or "samenet" to match any address in any subnet that the server is
|
||||||
|
# directly connected to.
|
||||||
|
#
|
||||||
|
# METHOD can be "trust", "reject", "md5", "password", "scram-sha-256",
|
||||||
|
# "gss", "sspi", "ident", "peer", "pam", "ldap", "radius" or "cert".
|
||||||
|
# Note that "password" sends passwords in clear text; "md5" or
|
||||||
|
# "scram-sha-256" are preferred since they send encrypted passwords.
|
||||||
|
#
|
||||||
|
# OPTIONS are a set of options for the authentication in the format
|
||||||
|
# NAME=VALUE. The available options depend on the different
|
||||||
|
# authentication methods -- refer to the "Client Authentication"
|
||||||
|
# section in the documentation for a list of which options are
|
||||||
|
# available for which authentication methods.
|
||||||
|
#
|
||||||
|
# Database and user names containing spaces, commas, quotes and other
|
||||||
|
# special characters must be quoted. Quoting one of the keywords
|
||||||
|
# "all", "sameuser", "samerole" or "replication" makes the name lose
|
||||||
|
# its special character, and just match a database or username with
|
||||||
|
# that name.
|
||||||
|
#
|
||||||
|
# This file is read on server startup and when the server receives a
|
||||||
|
# SIGHUP signal. If you edit the file on a running system, you have to
|
||||||
|
# SIGHUP the server for the changes to take effect, run "pg_ctl reload",
|
||||||
|
# or execute "SELECT pg_reload_conf()".
|
||||||
|
#
|
||||||
|
# Put your actual configuration here
|
||||||
|
# ----------------------------------
|
||||||
|
#
|
||||||
|
# If you want to allow non-local connections, you need to add more
|
||||||
|
# "host" records. In that case you will also need to make PostgreSQL
|
||||||
|
# listen on a non-local interface via the listen_addresses
|
||||||
|
# configuration parameter, or via the -i or -h command line switches.
|
||||||
|
|
||||||
|
# CAUTION: Configuring the system for local "trust" authentication
|
||||||
|
# allows any local user to connect as any PostgreSQL user, including
|
||||||
|
# the database superuser. If you do not trust all your local users,
|
||||||
|
# use another authentication method.
|
||||||
|
|
||||||
|
|
||||||
|
# TYPE DATABASE USER ADDRESS METHOD
|
||||||
|
|
||||||
|
# "local" is for Unix domain socket connections only
|
||||||
|
local all all trust
|
||||||
|
# IPv4 local connections:
|
||||||
|
host all all 127.0.0.1/32 trust
|
||||||
|
# IPv6 local connections:
|
||||||
|
host all all ::1/128 trust
|
||||||
|
# Allow replication connections from localhost, by a user with the
|
||||||
|
# replication privilege.
|
||||||
|
local replication all trust
|
||||||
|
host replication all 127.0.0.1/32 trust
|
||||||
|
host replication all ::1/128 trust
|
||||||
@@ -185,3 +185,41 @@ pub const XLOG_BLCKSZ: usize = 8192;
|
|||||||
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
|
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
|
||||||
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
|
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
|
||||||
pub const XLP_LONG_HEADER: u16 = 0x0002;
|
pub const XLP_LONG_HEADER: u16 = 0x0002;
|
||||||
|
|
||||||
|
pub const PG_MAJORVERSION: &'static str = "14";
|
||||||
|
|
||||||
|
// List of subdirectories inside pgdata.
|
||||||
|
// Copied from src/bin/initdb/initdb.c
|
||||||
|
pub const PGDATA_SUBDIRS: [&'static str; 22] = [
|
||||||
|
"global",
|
||||||
|
"pg_wal/archive_status",
|
||||||
|
"pg_commit_ts",
|
||||||
|
"pg_dynshmem",
|
||||||
|
"pg_notify",
|
||||||
|
"pg_serial",
|
||||||
|
"pg_snapshots",
|
||||||
|
"pg_subtrans",
|
||||||
|
"pg_twophase",
|
||||||
|
"pg_multixact",
|
||||||
|
"pg_multixact/members",
|
||||||
|
"pg_multixact/offsets",
|
||||||
|
"base",
|
||||||
|
"base/1",
|
||||||
|
"pg_replslot",
|
||||||
|
"pg_tblspc",
|
||||||
|
"pg_stat",
|
||||||
|
"pg_stat_tmp",
|
||||||
|
"pg_xact",
|
||||||
|
"pg_logical",
|
||||||
|
"pg_logical/snapshots",
|
||||||
|
"pg_logical/mappings",
|
||||||
|
];
|
||||||
|
|
||||||
|
pub const PGDATA_SPECIAL_FILES: [&'static str; 4] = [
|
||||||
|
"pg_hba.conf",
|
||||||
|
"pg_ident.conf",
|
||||||
|
"postgresql.conf",
|
||||||
|
"postgresql.auto.conf",
|
||||||
|
];
|
||||||
|
|
||||||
|
pub static PG_HBA: &'static str = include_str!("../samples/pg_hba.conf");
|
||||||
|
|||||||
Reference in New Issue
Block a user