Compare commits

..

1 Commits

Author SHA1 Message Date
Bojan Serafimov
13cddbb10d WIP 2022-06-15 18:19:21 -04:00
20 changed files with 425 additions and 1425 deletions

1
Cargo.lock generated
View File

@@ -1842,7 +1842,6 @@ dependencies = [
"tracing",
"url",
"utils",
"walkdir",
"workspace_hack",
]

View File

@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Write};
use std::io::Write;
use std::net::TcpStream;
use std::num::NonZeroU64;
use std::path::PathBuf;
@@ -528,54 +527,4 @@ impl PageServerNode {
Ok(timeline_info_response)
}
/// Import a basebackup prepared using either:
/// a) `pg_basebackup -F tar`, or
/// b) The `fullbackup` pageserver endpoint
///
/// # Arguments
/// * `tenant_id` - tenant to import into. Created if not exists
/// * `timeline_id` - id to assign to imported timeline
/// * `base` - (start lsn of basebackup, path to `base.tar` file)
/// * `pg_wal` - if there's any wal to import: (end lsn, path to `pg_wal.tar`)
pub fn timeline_import(
&self,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
base: (Lsn, PathBuf),
pg_wal: Option<(Lsn, PathBuf)>,
) -> anyhow::Result<()> {
let mut client = self.pg_connection_config.connect(NoTls).unwrap();
// Init base reader
let (start_lsn, base_tarfile_path) = base;
let base_tarfile = File::open(base_tarfile_path)?;
let mut base_reader = BufReader::new(base_tarfile);
// Init wal reader if necessary
let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
let wal_tarfile = File::open(wal_tarfile_path)?;
let wal_reader = BufReader::new(wal_tarfile);
(end_lsn, Some(wal_reader))
} else {
(start_lsn, None)
};
// Import base
let import_cmd =
format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
let mut writer = client.copy_in(&import_cmd)?;
io::copy(&mut base_reader, &mut writer)?;
writer.finish()?;
// Import wal if necessary
if let Some(mut wal_reader) = wal_reader {
let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
let mut writer = client.copy_in(&import_cmd)?;
io::copy(&mut wal_reader, &mut writer)?;
writer.finish()?;
}
Ok(())
}
}

View File

@@ -14,7 +14,7 @@ use safekeeper::defaults::{
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::collections::{BTreeSet, HashMap};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::process::exit;
use std::str::FromStr;
use utils::{
@@ -159,20 +159,6 @@ fn main() -> Result<()> {
.about("Create a new blank timeline")
.arg(tenant_id_arg.clone())
.arg(branch_name_arg.clone()))
.subcommand(App::new("import")
.about("Import timeline from basebackup directory")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(Arg::new("node-name").long("node-name").takes_value(true)
.help("Name to assign to the imported timeline"))
.arg(Arg::new("base-tarfile").long("base-tarfile").takes_value(true)
.help("Basebackup tarfile to import"))
.arg(Arg::new("base-lsn").long("base-lsn").takes_value(true)
.help("Lsn the basebackup starts at"))
.arg(Arg::new("wal-tarfile").long("wal-tarfile").takes_value(true)
.help("Wal to add after base"))
.arg(Arg::new("end-lsn").long("end-lsn").takes_value(true)
.help("Lsn the basebackup ends at")))
).subcommand(
App::new("tenant")
.setting(AppSettings::ArgRequiredElseHelp)
@@ -627,43 +613,6 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
timeline.timeline_id, last_record_lsn, tenant_id,
);
}
Some(("import", import_match)) => {
let tenant_id = get_tenant_id(import_match, env)?;
let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
let name = import_match
.value_of("node-name")
.ok_or_else(|| anyhow!("No node name provided"))?;
// Parse base inputs
let base_tarfile = import_match
.value_of("base-tarfile")
.map(|s| PathBuf::from_str(s).unwrap())
.ok_or_else(|| anyhow!("No base-tarfile provided"))?;
let base_lsn = Lsn::from_str(
import_match
.value_of("base-lsn")
.ok_or_else(|| anyhow!("No base-lsn provided"))?,
)?;
let base = (base_lsn, base_tarfile);
// Parse pg_wal inputs
let wal_tarfile = import_match
.value_of("wal-tarfile")
.map(|s| PathBuf::from_str(s).unwrap());
let end_lsn = import_match
.value_of("end-lsn")
.map(|s| Lsn::from_str(s).unwrap());
// TODO validate both or none are provided
let pg_wal = end_lsn.zip(wal_tarfile);
let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal)?;
println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
cplane.new_node(tenant_id, name, timeline_id, None, None)?;
println!("Done");
}
Some(("branch", branch_match)) => {
let tenant_id = get_tenant_id(branch_match, env)?;
let new_branch_name = branch_match

View File

@@ -61,7 +61,6 @@ utils = { path = "../libs/utils" }
remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"
walkdir = "2.3.2"
[dev-dependencies]
hex-literal = "0.3"

View File

@@ -13,7 +13,6 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{BufMut, BytesMut};
use fail::fail_point;
use itertools::Itertools;
use std::fmt::Write as FmtWrite;
use std::io;
use std::io::Write;
@@ -22,7 +21,7 @@ use std::time::SystemTime;
use tar::{Builder, EntryType, Header};
use tracing::*;
use crate::reltag::{RelTag, SlruKind};
use crate::reltag::SlruKind;
use crate::repository::Timeline;
use crate::DatadirTimelineImpl;
use postgres_ffi::xlog_utils::*;
@@ -40,12 +39,11 @@ where
timeline: &'a Arc<DatadirTimelineImpl>,
pub lsn: Lsn,
prev_record_lsn: Lsn,
full_backup: bool,
finished: bool,
}
// Create basebackup with non-rel data in it.
// Only include relational data if 'full_backup' is true.
// Create basebackup with non-rel data in it. Omit relational data.
//
// Currently we use empty lsn in two cases:
// * During the basebackup right after timeline creation
@@ -60,7 +58,6 @@ where
write: W,
timeline: &'a Arc<DatadirTimelineImpl>,
req_lsn: Option<Lsn>,
full_backup: bool,
) -> Result<Basebackup<'a, W>> {
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
@@ -97,8 +94,8 @@ where
};
info!(
"taking basebackup lsn={}, prev_lsn={} (full_backup={})",
backup_lsn, backup_prev, full_backup
"taking basebackup lsn={}, prev_lsn={}",
backup_lsn, backup_prev
);
Ok(Basebackup {
@@ -106,14 +103,11 @@ where
timeline,
lsn: backup_lsn,
prev_record_lsn: backup_prev,
full_backup,
finished: false,
})
}
pub fn send_tarball(mut self) -> anyhow::Result<()> {
// TODO include checksum
// Create pgdata subdirs structure
for dir in pg_constants::PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(*dir)?;
@@ -146,13 +140,6 @@ where
// Create tablespace directories
for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn)? {
self.add_dbdir(spcnode, dbnode, has_relmap_file)?;
// Gather and send relational files in each database if full backup is requested.
if self.full_backup {
for rel in self.timeline.list_rels(spcnode, dbnode, self.lsn)? {
self.add_rel(rel)?;
}
}
}
for xid in self.timeline.list_twophase_files(self.lsn)? {
self.add_twophase_file(xid)?;
@@ -170,38 +157,6 @@ where
Ok(())
}
fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
let nblocks = self.timeline.get_rel_size(tag, self.lsn)?;
// Function that adds relation segment data to archive
let mut add_file = |segment_index, data: &Vec<u8>| -> anyhow::Result<()> {
let file_name = tag.to_segfile_name(segment_index as u32);
let header = new_tar_header(&file_name, data.len() as u64)?;
self.ar.append(&header, data.as_slice())?;
Ok(())
};
// If the relation is empty, create an empty file
if nblocks == 0 {
add_file(0, &vec![])?;
return Ok(());
}
// Add a file for each chunk of blocks (aka segment)
let chunks = (0..nblocks).chunks(pg_constants::RELSEG_SIZE as usize);
for (seg, blocks) in chunks.into_iter().enumerate() {
let mut segment_data: Vec<u8> = vec![];
for blknum in blocks {
let img = self.timeline.get_rel_page_at_lsn(tag, blknum, self.lsn)?;
segment_data.extend_from_slice(&img[..]);
}
add_file(seg, &segment_data)?;
}
Ok(())
}
//
// Generate SLRU segment files from repository.
//

View File

@@ -2,6 +2,7 @@
//! Import data and WAL from a PostgreSQL data directory and WAL segments into
//! a zenith Timeline.
//!
use std::fs;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
@@ -9,18 +10,16 @@ use std::path::{Path, PathBuf};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use tracing::*;
use walkdir::WalkDir;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Repository;
use crate::repository::Timeline;
use crate::walingest::WalIngest;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::Oid;
use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::{Oid, TransactionId};
use utils::lsn::Lsn;
///
@@ -36,29 +35,100 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
) -> Result<()> {
let mut pg_control: Option<ControlFileData> = None;
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
// Then fishing out pg_control would be unnecessary
let mut modification = tline.begin_modification(lsn);
modification.init_empty()?;
// Import all but pg_wal
let all_but_wal = WalkDir::new(path)
.into_iter()
.filter_entry(|entry| !entry.path().ends_with("pg_wal"));
for entry in all_but_wal {
let entry = entry.unwrap();
let metadata = entry.metadata().unwrap();
if metadata.is_file() {
let absolute_path = entry.path();
let relative_path = absolute_path.strip_prefix(path)?;
// Scan 'global'
let mut relfiles: Vec<PathBuf> = Vec::new();
for direntry in fs::read_dir(path.join("global"))? {
let direntry = direntry?;
match direntry.file_name().to_str() {
None => continue,
let file = File::open(absolute_path)?;
let len = metadata.len() as usize;
if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? {
pg_control = Some(control_file);
Some("pg_control") => {
pg_control = Some(import_control_file(&mut modification, &direntry.path())?);
}
Some("pg_filenode.map") => {
import_relmap_file(
&mut modification,
pg_constants::GLOBALTABLESPACE_OID,
0,
&direntry.path(),
)?;
}
// Load any relation files into the page server (but only after the other files)
_ => relfiles.push(direntry.path()),
}
}
for relfile in relfiles {
import_relfile(
&mut modification,
&relfile,
pg_constants::GLOBALTABLESPACE_OID,
0,
)?;
}
// Scan 'base'. It contains database dirs, the database OID is the filename.
// E.g. 'base/12345', where 12345 is the database OID.
for direntry in fs::read_dir(path.join("base"))? {
let direntry = direntry?;
//skip all temporary files
if direntry.file_name().to_string_lossy() == "pgsql_tmp" {
continue;
}
let dboid = direntry.file_name().to_string_lossy().parse::<u32>()?;
let mut relfiles: Vec<PathBuf> = Vec::new();
for direntry in fs::read_dir(direntry.path())? {
let direntry = direntry?;
match direntry.file_name().to_str() {
None => continue,
Some("PG_VERSION") => {
//modification.put_dbdir_creation(pg_constants::DEFAULTTABLESPACE_OID, dboid)?;
}
Some("pg_filenode.map") => import_relmap_file(
&mut modification,
pg_constants::DEFAULTTABLESPACE_OID,
dboid,
&direntry.path(),
)?,
// Load any relation files into the page server
_ => relfiles.push(direntry.path()),
}
}
for relfile in relfiles {
import_relfile(
&mut modification,
&relfile,
pg_constants::DEFAULTTABLESPACE_OID,
dboid,
)?;
}
}
for entry in fs::read_dir(path.join("pg_xact"))? {
let entry = entry?;
import_slru_file(&mut modification, SlruKind::Clog, &entry.path())?;
}
for entry in fs::read_dir(path.join("pg_multixact").join("members"))? {
let entry = entry?;
import_slru_file(&mut modification, SlruKind::MultiXactMembers, &entry.path())?;
}
for entry in fs::read_dir(path.join("pg_multixact").join("offsets"))? {
let entry = entry?;
import_slru_file(&mut modification, SlruKind::MultiXactOffsets, &entry.path())?;
}
for entry in fs::read_dir(path.join("pg_twophase"))? {
let entry = entry?;
let xid = u32::from_str_radix(&entry.path().to_string_lossy(), 16)?;
import_twophase_file(&mut modification, xid, &entry.path())?;
}
// TODO: Scan pg_tblspc
// We're done importing all the data files.
modification.commit()?;
@@ -88,13 +158,11 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
}
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
fn import_rel<R: Repository, Reader: Read>(
fn import_relfile<R: Repository>(
modification: &mut DatadirModification<R>,
path: &Path,
spcoid: Oid,
dboid: Oid,
mut reader: Reader,
len: usize,
) -> anyhow::Result<()> {
// Does it look like a relation file?
trace!("importing rel file {}", path.display());
@@ -105,10 +173,16 @@ fn import_rel<R: Repository, Reader: Read>(
e
})?;
let mut file = File::open(path)?;
let mut buf: [u8; 8192] = [0u8; 8192];
ensure!(len % pg_constants::BLCKSZ as usize == 0);
let nblocks = len / pg_constants::BLCKSZ as usize;
let len = file.metadata().unwrap().len();
ensure!(len % pg_constants::BLCKSZ as u64 == 0);
let nblocks = len / pg_constants::BLCKSZ as u64;
if segno != 0 {
todo!();
}
let rel = RelTag {
spcnode: spcoid,
@@ -116,22 +190,11 @@ fn import_rel<R: Repository, Reader: Read>(
relnode,
forknum,
};
modification.put_rel_creation(rel, nblocks as u32)?;
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
// Call put_rel_creation for every segment of the relation,
// because there is no guarantee about the order in which we are processing segments.
// ignore "relation already exists" error
if let Err(e) = modification.put_rel_creation(rel, nblocks as u32) {
if e.to_string().contains("already exists") {
debug!("relation {} already exists. we must be extending it", rel);
} else {
return Err(e);
}
}
loop {
let r = reader.read_exact(&mut buf);
let r = file.read_exact(&mut buf);
match r {
Ok(_) => {
modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
@@ -141,9 +204,7 @@ fn import_rel<R: Repository, Reader: Read>(
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof => {
// reached EOF. That's expected.
let relative_blknum =
blknum - segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
ensure!(blknum == nblocks as u32, "unexpected EOF");
break;
}
_ => {
@@ -154,39 +215,96 @@ fn import_rel<R: Repository, Reader: Read>(
blknum += 1;
}
// Update relation size
//
// If we process rel segments out of order,
// put_rel_extend will skip the update.
modification.put_rel_extend(rel, blknum)?;
Ok(())
}
/// Import a relmapper (pg_filenode.map) file into the repository
fn import_relmap_file<R: Repository>(
modification: &mut DatadirModification<R>,
spcnode: Oid,
dbnode: Oid,
path: &Path,
) -> Result<()> {
let mut file = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
file.read_to_end(&mut buffer)?;
trace!("importing relmap file {}", path.display());
modification.put_relmap_file(spcnode, dbnode, Bytes::copy_from_slice(&buffer[..]))?;
Ok(())
}
/// Import a twophase state file (pg_twophase/<xid>) into the repository
fn import_twophase_file<R: Repository>(
modification: &mut DatadirModification<R>,
xid: TransactionId,
path: &Path,
) -> Result<()> {
let mut file = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
file.read_to_end(&mut buffer)?;
trace!("importing non-rel file {}", path.display());
modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?;
Ok(())
}
///
/// Import pg_control file into the repository.
///
/// The control file is imported as is, but we also extract the checkpoint record
/// from it and store it separated.
fn import_control_file<R: Repository>(
modification: &mut DatadirModification<R>,
path: &Path,
) -> Result<ControlFileData> {
let mut file = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
file.read_to_end(&mut buffer)?;
trace!("importing control file {}", path.display());
// Import it as ControlFile
modification.put_control_file(Bytes::copy_from_slice(&buffer[..]))?;
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&buffer)?;
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
Ok(pg_control)
}
///
/// Import an SLRU segment file
///
fn import_slru<R: Repository, Reader: Read>(
fn import_slru_file<R: Repository>(
modification: &mut DatadirModification<R>,
slru: SlruKind,
path: &Path,
mut reader: Reader,
len: usize,
) -> Result<()> {
trace!("importing slru file {}", path.display());
let mut file = File::open(path)?;
let mut buf: [u8; 8192] = [0u8; 8192];
let segno = u32::from_str_radix(&path.file_name().unwrap().to_string_lossy(), 16)?;
ensure!(len % pg_constants::BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
let nblocks = len / pg_constants::BLCKSZ as usize;
let len = file.metadata().unwrap().len();
ensure!(len % pg_constants::BLCKSZ as u64 == 0); // we assume SLRU block size is the same as BLCKSZ
let nblocks = len / pg_constants::BLCKSZ as u64;
ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as u64);
modification.put_slru_segment_creation(slru, segno, nblocks as u32)?;
let mut rpageno = 0;
loop {
let r = reader.read_exact(&mut buf);
let r = file.read_exact(&mut buf);
match r {
Ok(_) => {
modification.put_slru_page_image(
@@ -278,245 +396,10 @@ fn import_wal<R: Repository>(
}
if last_lsn != startpoint {
info!("reached end of WAL at {}", last_lsn);
debug!("reached end of WAL at {}", last_lsn);
} else {
info!("no WAL to import at {}", last_lsn);
}
Ok(())
}
pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
reader: Reader,
base_lsn: Lsn,
) -> Result<()> {
info!("importing base at {}", base_lsn);
let mut modification = tline.begin_modification(base_lsn);
modification.init_empty()?;
let mut pg_control: Option<ControlFileData> = None;
// Import base
for base_tar_entry in tar::Archive::new(reader).entries()? {
let entry = base_tar_entry.unwrap();
let header = entry.header();
let len = header.entry_size()? as usize;
let file_path = header.path().unwrap().into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
// let mut buffer = Vec::new();
// entry.read_to_end(&mut buffer).unwrap();
if let Some(res) = import_file(&mut modification, file_path.as_ref(), entry, len)? {
// We found the pg_control file.
pg_control = Some(res);
}
}
tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
}
_ => {
panic!("tar::EntryType::?? {}", file_path.display());
}
}
}
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit()?;
Ok(())
}
pub fn import_wal_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
reader: Reader,
start_lsn: Lsn,
end_lsn: Lsn,
) -> Result<()> {
// Set up walingest mutable state
let mut waldecoder = WalStreamDecoder::new(start_lsn);
let mut segno = start_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = start_lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = start_lsn;
let mut walingest = WalIngest::new(tline, start_lsn)?;
// Ingest wal until end_lsn
info!("importing wal until {}", end_lsn);
let mut pg_wal_tar = tar::Archive::new(reader);
let mut pg_wal_entries_iter = pg_wal_tar.entries()?;
while last_lsn <= end_lsn {
let bytes = {
let entry = pg_wal_entries_iter.next().expect("expected more wal")?;
let header = entry.header();
let file_path = header.path().unwrap().into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
// FIXME: assume postgresql tli 1 for now
let expected_filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
let file_name = file_path.file_name().unwrap().to_string_lossy();
ensure!(expected_filename == file_name);
debug!("processing wal file {:?}", file_path);
read_all_bytes(entry)?
}
tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
continue;
}
_ => {
panic!("tar::EntryType::?? {}", file_path.display());
}
}
};
waldecoder.feed_bytes(&bytes[offset..]);
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(tline, recdata, lsn)?;
last_lsn = lsn;
debug!("imported record at {} (end {})", lsn, end_lsn);
}
}
debug!("imported records up to {}", last_lsn);
segno += 1;
offset = 0;
}
if last_lsn != start_lsn {
info!("reached end of WAL at {}", last_lsn);
} else {
info!("there was no WAL to import at {}", last_lsn);
}
// Log any extra unused files
for e in &mut pg_wal_entries_iter {
let entry = e.unwrap();
let header = entry.header();
let file_path = header.path().unwrap().into_owned();
info!("skipping {:?}", file_path);
}
Ok(())
}
pub fn import_file<R: Repository, Reader: Read>(
modification: &mut DatadirModification<R>,
file_path: &Path,
reader: Reader,
len: usize,
) -> Result<Option<ControlFileData>> {
debug!("looking at {:?}", file_path);
if file_path.starts_with("global") {
let spcnode = pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
"pg_control" => {
let bytes = read_all_bytes(reader)?;
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&bytes[..])?;
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
debug!("imported control file");
// Import it as ControlFile
modification.put_control_file(bytes)?;
return Ok(Some(pg_control));
}
"pg_filenode.map" => {
let bytes = read_all_bytes(reader)?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
debug!("imported relmap file")
}
"PG_VERSION" => {
debug!("ignored");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
debug!("imported rel creation");
}
}
} else if file_path.starts_with("base") {
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
let dbnode: u32 = file_path
.iter()
.nth(1)
.unwrap()
.to_string_lossy()
.parse()
.unwrap();
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
"pg_filenode.map" => {
let bytes = read_all_bytes(reader)?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
debug!("imported relmap file")
}
"PG_VERSION" => {
debug!("ignored");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
debug!("imported rel creation");
}
}
} else if file_path.starts_with("pg_xact") {
let slru = SlruKind::Clog;
import_slru(modification, slru, file_path, reader, len)?;
debug!("imported clog slru");
} else if file_path.starts_with("pg_multixact/offsets") {
let slru = SlruKind::MultiXactOffsets;
import_slru(modification, slru, file_path, reader, len)?;
debug!("imported multixact offsets slru");
} else if file_path.starts_with("pg_multixact/members") {
let slru = SlruKind::MultiXactMembers;
import_slru(modification, slru, file_path, reader, len)?;
debug!("imported multixact members slru");
} else if file_path.starts_with("pg_twophase") {
let xid = u32::from_str_radix(&file_path.file_name().unwrap().to_string_lossy(), 16)?;
let bytes = read_all_bytes(reader)?;
modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?;
debug!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
debug!("found wal file in base section. ignore it");
} else if file_path.starts_with("zenith.signal") {
// Parse zenith signal file to set correct previous LSN
let bytes = read_all_bytes(reader)?;
// zenith.signal format is "PREV LSN: prev_lsn"
let zenith_signal = std::str::from_utf8(&bytes).unwrap();
let zenith_signal = zenith_signal.split(':').collect::<Vec<_>>();
let prev_lsn = zenith_signal[1].trim().parse::<Lsn>().unwrap();
let writer = modification.tline.tline.writer();
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);
} else if file_path.starts_with("pg_tblspc") {
// TODO Backups exported from neon won't have pg_tblspc, but we will need
// this to import arbitrary postgres databases.
bail!("Importing pg_tblspc is not implemented");
} else {
debug!("ignored");
}
Ok(None)
}
fn read_all_bytes<Reader: Read>(mut reader: Reader) -> Result<Bytes> {
let mut buf: Vec<u8> = vec![];
reader.read_to_end(&mut buf)?;
Ok(Bytes::copy_from_slice(&buf[..]))
}

View File

@@ -243,15 +243,15 @@ impl Repository for LayeredRepository {
);
timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn);
// Insert if not exists
let timeline = Arc::new(timeline);
match timelines.entry(timelineid) {
Entry::Occupied(_) => bail!("Timeline already exists"),
Entry::Vacant(vacant) => {
vacant.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)))
}
};
let r = timelines.insert(
timelineid,
LayeredTimelineEntry::Loaded(Arc::clone(&timeline)),
);
ensure!(
r.is_none(),
"assertion failure, inserted duplicate timeline"
);
Ok(timeline)
}

View File

@@ -13,7 +13,7 @@ use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use regex::Regex;
use std::io::{self, Read};
use std::io;
use std::net::TcpListener;
use std::str;
use std::str::FromStr;
@@ -29,8 +29,6 @@ use utils::{
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
use crate::layered_repository::LayeredRepository;
use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp};
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
@@ -202,96 +200,6 @@ impl PagestreamBeMessage {
}
}
/// Implements Read for the server side of CopyIn
struct CopyInReader<'a> {
pgb: &'a mut PostgresBackend,
/// Overflow buffer for bytes sent in CopyData messages
/// that the reader (caller of read) hasn't asked for yet.
/// TODO use BytesMut?
buf: Vec<u8>,
/// Bytes before `buf_begin` are considered as dropped.
/// This allows us to implement O(1) pop_front on Vec<u8>.
/// The Vec won't grow large because we only add to it
/// when it's empty.
buf_begin: usize,
}
impl<'a> CopyInReader<'a> {
// NOTE: pgb should be in copy in state already
fn new(pgb: &'a mut PostgresBackend) -> Self {
Self {
pgb,
buf: Vec::<_>::new(),
buf_begin: 0,
}
}
}
impl<'a> Drop for CopyInReader<'a> {
fn drop(&mut self) {
// Finalize copy protocol so that self.pgb can be reused
// TODO instead, maybe take ownership of pgb and give it back at the end
let mut buf: Vec<u8> = vec![];
let _ = self.read_to_end(&mut buf);
}
}
impl<'a> Read for CopyInReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
while !thread_mgr::is_shutdown_requested() {
// Return from buffer if nonempty
if self.buf_begin < self.buf.len() {
let bytes_to_read = std::cmp::min(buf.len(), self.buf.len() - self.buf_begin);
buf[..bytes_to_read].copy_from_slice(&self.buf[self.buf_begin..][..bytes_to_read]);
self.buf_begin += bytes_to_read;
return Ok(bytes_to_read);
}
// Delete garbage
self.buf.clear();
self.buf_begin = 0;
// Wait for client to send CopyData bytes
match self.pgb.read_message() {
Ok(Some(message)) => {
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
FeMessage::CopyDone => return Ok(0),
FeMessage::Sync => continue,
m => {
let msg = format!("unexpected message {:?}", m);
self.pgb.write_message(&BeMessage::ErrorResponse(&msg))?;
return Err(io::Error::new(io::ErrorKind::Other, msg));
}
};
// Return as much as we can, saving the rest in self.buf
let mut reader = copy_data_bytes.reader();
let bytes_read = reader.read(buf)?;
reader.read_to_end(&mut self.buf)?;
return Ok(bytes_read);
}
Ok(None) => {
let msg = "client closed connection";
self.pgb.write_message(&BeMessage::ErrorResponse(msg))?;
return Err(io::Error::new(io::ErrorKind::Other, msg));
}
Err(e) => {
if !is_socket_read_timed_out(&e) {
return Err(io::Error::new(io::ErrorKind::Other, e));
}
}
}
}
// Shutting down
let msg = "Importer thread was shut down";
Err(io::Error::new(io::ErrorKind::Other, msg))
}
}
///////////////////////////////////////////////////////////////////////////////
///
@@ -539,98 +447,6 @@ impl PageServerHandler {
Ok(())
}
fn handle_import_basebackup(
&self,
pgb: &mut PostgresBackend,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
base_lsn: Lsn,
_end_lsn: Lsn,
) -> anyhow::Result<()> {
thread_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let _enter =
info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered();
// Create empty timeline
info!("creating new timeline");
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.create_empty_timeline(timeline_id, Lsn(0))?;
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
// from connecting before that and writing conflicting wal.
//
// This is not relevant for pageserver->pageserver migrations, since there's
// no wal to import. But should be fixed if we want to import from postgres.
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?;
// TODO check checksum
// Meanwhile you can verify client-side by taking fullbackup
// and checking that it matches in size with what was imported.
// It wouldn't work if base came from vanilla postgres though,
// since we discard some log files.
// Flush data to disk, then upload to s3
info!("flushing layers");
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;
info!("done");
Ok(())
}
fn handle_import_wal(
&self,
pgb: &mut PostgresBackend,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
) -> anyhow::Result<()> {
thread_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let _enter =
info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.get_timeline_load(timeline_id)?;
ensure!(timeline.get_last_record_lsn() == start_lsn);
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
// Import wal provided via CopyData
info!("importing wal");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_wal_from_tar(&mut datadir_timeline, reader, start_lsn, end_lsn)?;
// TODO Does it make sense to overshoot?
ensure!(datadir_timeline.tline.get_last_record_lsn() >= end_lsn);
// Flush data to disk, then upload to s3. No need for a forced checkpoint.
// We only want to persist the data, and it doesn't matter if it's in the
// shape of deltas or images.
info!("flushing layers");
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;
info!("done");
Ok(())
}
/// Helper function to handle the LSN from client request.
///
/// Each GetPage (and Exists and Nblocks) request includes information about
@@ -780,7 +596,6 @@ impl PageServerHandler {
timelineid: ZTimelineId,
lsn: Option<Lsn>,
tenantid: ZTenantId,
full_backup: bool,
) -> anyhow::Result<()> {
let span = info_span!("basebackup", timeline = %timelineid, tenant = %tenantid, lsn = field::Empty);
let _enter = span.enter();
@@ -803,7 +618,7 @@ impl PageServerHandler {
{
let mut writer = CopyDataSink { pgb };
let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn, full_backup)?;
let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?;
span.record("lsn", &basebackup.lsn.to_string().as_str());
basebackup.send_tarball()?;
}
@@ -906,74 +721,7 @@ impl postgres_backend::Handler for PageServerHandler {
};
// Check that the timeline exists
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, false)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
// same as basebackup, but result includes relational data as well
else if query_string.starts_with("fullbackup ") {
let (_, params_raw) = query_string.split_at("fullbackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(
params.len() == 3,
"invalid param number for fullbackup command"
);
let tenantid = ZTenantId::from_str(params[0])?;
let timelineid = ZTimelineId::from_str(params[1])?;
self.check_permission(Some(tenantid))?;
// Lsn is required for fullbackup, because otherwise we would not know
// at which lsn to upload this backup.
//
// The caller is responsible for providing a valid lsn
// and using it in the subsequent import.
let lsn = Some(Lsn::from_str(params[2])?);
// Check that the timeline exists
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, true)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import basebackup ") {
// Import the `base` section (everything but the wal) of a basebackup.
// Assumes the tenant already exists on this pageserver.
//
// Files are scheduled to be persisted to remote storage, and the
// caller should poll the http api to check when that is done.
//
// Example import command:
// 1. Get start/end LSN from backup_manifest file
// 2. Run:
// cat my_backup/base.tar | psql -h $PAGESERVER \
// -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN"
let (_, params_raw) = query_string.split_at("import basebackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(params.len() == 4);
let tenant = ZTenantId::from_str(params[0])?;
let timeline = ZTimelineId::from_str(params[1])?;
let base_lsn = Lsn::from_str(params[2])?;
let end_lsn = Lsn::from_str(params[3])?;
self.check_permission(Some(tenant))?;
self.handle_import_basebackup(pgb, tenant, timeline, base_lsn, end_lsn)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import wal ") {
// Import the `pg_wal` section of a basebackup.
//
// Files are scheduled to be persisted to remote storage, and the
// caller should poll the http api to check when that is done.
let (_, params_raw) = query_string.split_at("import wal ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(params.len() == 4);
let tenant = ZTenantId::from_str(params[0])?;
let timeline = ZTimelineId::from_str(params[1])?;
let start_lsn = Lsn::from_str(params[2])?;
let end_lsn = Lsn::from_str(params[3])?;
self.check_permission(Some(tenant))?;
self.handle_import_wal(pgb, tenant, timeline, start_lsn, end_lsn)?;
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'"

View File

@@ -749,7 +749,6 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
}
/// Extend relation
/// If new size is smaller, do nothing.
pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
ensure!(rel.relnode != 0, "invalid relnode");
@@ -757,13 +756,10 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
let size_key = rel_size_to_key(rel);
let old_size = self.get(size_key)?.get_u32_le();
// only extend relation here. never decrease the size
if nblocks > old_size {
let buf = nblocks.to_le_bytes();
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
let buf = nblocks.to_le_bytes();
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
self.pending_nblocks += nblocks as isize - old_size as isize;
}
self.pending_nblocks += nblocks as isize - old_size as isize;
Ok(())
}

View File

@@ -3,7 +3,7 @@ use std::cmp::Ordering;
use std::fmt;
use postgres_ffi::relfile_utils::forknumber_to_name;
use postgres_ffi::{pg_constants, Oid};
use postgres_ffi::Oid;
///
/// Relation data file segment id throughout the Postgres cluster.
@@ -75,30 +75,6 @@ impl fmt::Display for RelTag {
}
}
impl RelTag {
pub fn to_segfile_name(&self, segno: u32) -> String {
let mut name = if self.spcnode == pg_constants::GLOBALTABLESPACE_OID {
"global/".to_string()
} else {
format!("base/{}/", self.dbnode)
};
name += &self.relnode.to_string();
if let Some(fork_name) = forknumber_to_name(self.forknum) {
name += "_";
name += fork_name;
}
if segno != 0 {
name += ".";
name += &segno.to_string();
}
name
}
}
///
/// Non-relation transaction status files (clog (a.k.a. pg_xact) and
/// pg_multixact) in Postgres are handled by SLRU (Simple LRU) buffer,

View File

@@ -124,7 +124,7 @@ fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: L
term,
start_streaming_at: lsn,
term_history: history,
timeline_start_lsn: lsn,
timeline_start_lsn: Lsn(0),
});
spg.timeline.get().process_msg(&proposer_elected_request)?;

View File

@@ -13,11 +13,9 @@ use serde::{Deserialize, Serialize};
use std::cmp::min;
use std::net::Shutdown;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use std::{str, thread};
use tokio::sync::watch::Receiver;
use tokio::time::timeout;
use tracing::*;
use utils::{
bin_ser::BeSer,
@@ -193,143 +191,100 @@ impl ReplicationConn {
}
})?;
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(async move {
let (_, persisted_state) = spg.timeline.get().get_state();
if persisted_state.server.wal_seg_size == 0
|| persisted_state.timeline_start_lsn == Lsn(0)
{
bail!("Cannot start replication before connecting to walproposer");
}
let wal_end = spg.timeline.get().get_end_of_wal();
// Walproposer gets special handling: safekeeper must give proposer all
// local WAL till the end, whether committed or not (walproposer will
// hang otherwise). That's because walproposer runs the consensus and
// synchronizes safekeepers on the most advanced one.
//
// There is a small risk of this WAL getting concurrently garbaged if
// another compute rises which collects majority and starts fixing log
// on this safekeeper itself. That's ok as (old) proposer will never be
// able to commit such WAL.
let stop_pos: Option<Lsn> = if spg.appname == Some("wal_proposer_recovery".to_string())
{
Some(wal_end)
} else {
None
};
info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
// switch to copy
pgb.write_message(&BeMessage::CopyBothResponse)?;
let mut end_pos = Lsn(0);
let mut wal_reader = WalReader::new(
spg.conf.timeline_dir(&spg.timeline.get().zttid),
&persisted_state,
start_pos,
spg.conf.wal_backup_enabled,
)?;
// buffer for wal sending, limited by MAX_SEND_SIZE
let mut send_buf = vec![0u8; MAX_SEND_SIZE];
// watcher for commit_lsn updates
let mut commit_lsn_watch_rx = spg.timeline.get().get_commit_lsn_watch_rx();
loop {
if let Some(stop_pos) = stop_pos {
if start_pos >= stop_pos {
break; /* recovery finished */
}
end_pos = stop_pos;
} else {
/* Wait until we have some data to stream */
let lsn = wait_for_lsn(&mut commit_lsn_watch_rx, start_pos).await?;
if let Some(lsn) = lsn {
end_pos = lsn;
} else {
// TODO: also check once in a while whether we are walsender
// to right pageserver.
if spg.timeline.get().stop_walsender(replica_id)? {
// Shut down, timeline is suspended.
// TODO create proper error type for this
bail!("end streaming to {:?}", spg.appname);
}
// timeout expired: request pageserver status
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
sent_ptr: end_pos.0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.context("Failed to send KeepAlive message")?;
continue;
}
}
let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
let send_size = min(send_size, send_buf.len());
let send_buf = &mut send_buf[..send_size];
// read wal into buffer
let send_size = wal_reader.read(send_buf).await?;
let send_buf = &send_buf[..send_size];
// Write some data to the network socket.
pgb.write_message(&BeMessage::XLogData(XLogDataBody {
wal_start: start_pos.0,
wal_end: end_pos.0,
timestamp: get_current_timestamp(),
data: send_buf,
}))
.context("Failed to send XLogData")?;
start_pos += send_size as u64;
trace!("sent WAL up to {}", start_pos);
}
Ok(())
})
}
}
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
// Wait until we have commit_lsn > lsn or timeout expires. Returns latest commit_lsn.
async fn wait_for_lsn(rx: &mut Receiver<Lsn>, lsn: Lsn) -> Result<Option<Lsn>> {
let commit_lsn: Lsn = *rx.borrow();
if commit_lsn > lsn {
return Ok(Some(commit_lsn));
}
let res = timeout(POLL_STATE_TIMEOUT, async move {
let mut commit_lsn;
let mut wal_seg_size: usize;
loop {
rx.changed().await?;
commit_lsn = *rx.borrow();
if commit_lsn > lsn {
wal_seg_size = spg.timeline.get().get_state().1.server.wal_seg_size as usize;
if wal_seg_size == 0 {
error!("Cannot start replication before connecting to wal_proposer");
sleep(Duration::from_secs(1));
} else {
break;
}
}
let wal_end = spg.timeline.get().get_end_of_wal();
// Walproposer gets special handling: safekeeper must give proposer all
// local WAL till the end, whether committed or not (walproposer will
// hang otherwise). That's because walproposer runs the consensus and
// synchronizes safekeepers on the most advanced one.
//
// There is a small risk of this WAL getting concurrently garbaged if
// another compute rises which collects majority and starts fixing log
// on this safekeeper itself. That's ok as (old) proposer will never be
// able to commit such WAL.
let stop_pos: Option<Lsn> = if spg.appname == Some("wal_proposer_recovery".to_string()) {
Some(wal_end)
} else {
None
};
info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
Ok(commit_lsn)
})
.await;
// switch to copy
pgb.write_message(&BeMessage::CopyBothResponse)?;
match res {
// success
Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
// error inside closure
Ok(Err(err)) => Err(err),
// timeout
Err(_) => Ok(None),
let mut end_pos = Lsn(0);
let mut wal_reader = WalReader::new(
spg.conf.timeline_dir(&spg.timeline.get().zttid),
wal_seg_size,
start_pos,
);
// buffer for wal sending, limited by MAX_SEND_SIZE
let mut send_buf = vec![0u8; MAX_SEND_SIZE];
loop {
if let Some(stop_pos) = stop_pos {
if start_pos >= stop_pos {
break; /* recovery finished */
}
end_pos = stop_pos;
} else {
/* Wait until we have some data to stream */
let lsn = spg.timeline.get().wait_for_lsn(start_pos);
if let Some(lsn) = lsn {
end_pos = lsn;
} else {
// TODO: also check once in a while whether we are walsender
// to right pageserver.
if spg.timeline.get().stop_walsender(replica_id)? {
// Shut down, timeline is suspended.
// TODO create proper error type for this
bail!("end streaming to {:?}", spg.appname);
}
// timeout expired: request pageserver status
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
sent_ptr: end_pos.0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.context("Failed to send KeepAlive message")?;
continue;
}
}
let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
let send_size = min(send_size, send_buf.len());
let send_buf = &mut send_buf[..send_size];
// read wal into buffer
let send_size = wal_reader.read(send_buf)?;
let send_buf = &send_buf[..send_size];
// Write some data to the network socket.
pgb.write_message(&BeMessage::XLogData(XLogDataBody {
wal_start: start_pos.0,
wal_end: end_pos.0,
timestamp: get_current_timestamp(),
data: send_buf,
}))
.context("Failed to send XLogData")?;
start_pos += send_size as u64;
trace!("sent WAL up to {}", start_pos);
}
Ok(())
}
}

View File

@@ -14,8 +14,8 @@ use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::{self};
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tracing::*;
@@ -37,6 +37,8 @@ use crate::wal_storage;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::SafeKeeperConf;
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
/// Replica status update + hot standby feedback
#[derive(Debug, Clone, Copy)]
pub struct ReplicaState {
@@ -75,6 +77,9 @@ impl ReplicaState {
struct SharedState {
/// Safekeeper object
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
/// For receiving-sending wal cooperation
/// quorum commit LSN we've notified walsenders about
notified_commit_lsn: Lsn,
/// State of replicas
replicas: Vec<Option<ReplicaState>>,
/// True when WAL backup launcher oversees the timeline, making sure WAL is
@@ -107,6 +112,7 @@ impl SharedState {
let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?;
Ok(Self {
notified_commit_lsn: Lsn(0),
sk,
replicas: Vec::new(),
wal_backup_active: false,
@@ -125,6 +131,7 @@ impl SharedState {
info!("timeline {} restored", zttid.timeline_id);
Ok(Self {
notified_commit_lsn: Lsn(0),
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?,
replicas: Vec::new(),
wal_backup_active: false,
@@ -264,6 +271,8 @@ pub struct Timeline {
/// For breeding receivers.
commit_lsn_watch_rx: watch::Receiver<Lsn>,
mutex: Mutex<SharedState>,
/// conditional variable used to notify wal senders
cond: Condvar,
}
impl Timeline {
@@ -280,6 +289,7 @@ impl Timeline {
commit_lsn_watch_tx,
commit_lsn_watch_rx,
mutex: Mutex::new(shared_state),
cond: Condvar::new(),
}
}
@@ -323,7 +333,7 @@ impl Timeline {
let mut shared_state = self.mutex.lock().unwrap();
if shared_state.num_computes == 0 {
let replica_state = shared_state.replicas[replica_id].unwrap();
let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
let stop = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
if stop {
@@ -395,6 +405,39 @@ impl Timeline {
})
}
/// Timed wait for an LSN to be committed.
///
/// Returns the last committed LSN, which will be at least
/// as high as the LSN waited for, or None if timeout expired.
///
pub fn wait_for_lsn(&self, lsn: Lsn) -> Option<Lsn> {
let mut shared_state = self.mutex.lock().unwrap();
loop {
let commit_lsn = shared_state.notified_commit_lsn;
// This must be `>`, not `>=`.
if commit_lsn > lsn {
return Some(commit_lsn);
}
let result = self
.cond
.wait_timeout(shared_state, POLL_STATE_TIMEOUT)
.unwrap();
if result.1.timed_out() {
return None;
}
shared_state = result.0
}
}
// Notify caught-up WAL senders about new WAL data received
// TODO: replace-unify it with commit_lsn_watch.
fn notify_wal_senders(&self, shared_state: &mut MutexGuard<SharedState>) {
if shared_state.notified_commit_lsn < shared_state.sk.inmem.commit_lsn {
shared_state.notified_commit_lsn = shared_state.sk.inmem.commit_lsn;
self.cond.notify_all();
}
}
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
self.commit_lsn_watch_rx.clone()
}
@@ -419,6 +462,8 @@ impl Timeline {
}
}
// Ping wal sender that new data might be available.
self.notify_wal_senders(&mut shared_state);
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
@@ -479,6 +524,7 @@ impl Timeline {
return Ok(());
}
shared_state.sk.record_safekeeper_info(sk_info)?;
self.notify_wal_senders(&mut shared_state);
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
commit_lsn = shared_state.sk.inmem.commit_lsn;
}

View File

@@ -2,7 +2,6 @@ use anyhow::{Context, Result};
use etcd_broker::subscription_key::{
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
};
use tokio::io::AsyncRead;
use tokio::task::JoinHandle;
use std::cmp::min;
@@ -11,9 +10,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use postgres_ffi::xlog_utils::{
XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, MAX_SEND_SIZE, PG_TLI,
};
use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI};
use remote_storage::{GenericRemoteStorage, RemoteStorage};
use tokio::fs::File;
use tokio::runtime::Builder;
@@ -448,49 +445,3 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
Ok(())
}
pub async fn read_object(
file_path: PathBuf,
offset: u64,
) -> (impl AsyncRead, JoinHandle<Result<()>>) {
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
let (mut pipe_writer, pipe_reader) = tokio::io::duplex(MAX_SEND_SIZE);
let copy_result = tokio::spawn(async move {
let res = match storage.as_ref().unwrap() {
GenericRemoteStorage::Local(local_storage) => {
let source = local_storage.remote_object_id(&file_path)?;
info!(
"local download about to start from {} at offset {}",
source.display(),
offset
);
local_storage
.download_byte_range(&source, offset, None, &mut pipe_writer)
.await
}
GenericRemoteStorage::S3(s3_storage) => {
let s3key = s3_storage.remote_object_id(&file_path)?;
info!(
"S3 download about to start from {:?} at offset {}",
s3key, offset
);
s3_storage
.download_byte_range(&s3key, offset, None, &mut pipe_writer)
.await
}
};
if let Err(e) = res {
error!("failed to download WAL segment from remote storage: {}", e);
Err(e)
} else {
Ok(())
}
});
(pipe_reader, copy_result)
}

View File

@@ -8,9 +8,7 @@
//! Note that last file has `.partial` suffix, that's different from postgres.
use anyhow::{anyhow, bail, Context, Result};
use std::io::{self, Seek, SeekFrom};
use std::pin::Pin;
use tokio::io::AsyncRead;
use std::io::{Read, Seek, SeekFrom};
use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::{
@@ -28,7 +26,6 @@ use utils::{lsn::Lsn, zid::ZTenantTimelineId};
use crate::safekeeper::SafeKeeperState;
use crate::wal_backup::read_object;
use crate::SafeKeeperConf;
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
@@ -36,8 +33,6 @@ use postgres_ffi::waldecoder::WalStreamDecoder;
use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
lazy_static! {
// The prometheus crate does not support u64 yet, i64 only (see `IntGauge`).
// i64 is faster than f64, so update to u64 when available.
@@ -509,125 +504,69 @@ pub struct WalReader {
timeline_dir: PathBuf,
wal_seg_size: usize,
pos: Lsn,
wal_segment: Option<Pin<Box<dyn AsyncRead>>>,
enable_remote_read: bool,
// S3 will be used to read WAL if LSN is not available locally
local_start_lsn: Lsn,
file: Option<File>,
}
impl WalReader {
pub fn new(
timeline_dir: PathBuf,
state: &SafeKeeperState,
start_pos: Lsn,
enable_remote_read: bool,
) -> Result<Self> {
if start_pos < state.timeline_start_lsn {
bail!(
"Requested streaming from {}, which is before the start of the timeline {}",
start_pos,
state.timeline_start_lsn
);
}
if state.server.wal_seg_size == 0
|| state.timeline_start_lsn == Lsn(0)
|| state.local_start_lsn == Lsn(0)
{
bail!("state uninitialized, no data to read");
}
Ok(Self {
pub fn new(timeline_dir: PathBuf, wal_seg_size: usize, pos: Lsn) -> Self {
Self {
timeline_dir,
wal_seg_size: state.server.wal_seg_size as usize,
pos: start_pos,
wal_segment: None,
enable_remote_read,
local_start_lsn: state.local_start_lsn,
})
wal_seg_size,
pos,
file: None,
}
}
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let mut wal_segment = match self.wal_segment.take() {
Some(reader) => reader,
None => self.open_segment().await?,
pub fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
// Take the `File` from `wal_file`, or open a new file.
let mut file = match self.file.take() {
Some(file) => file,
None => {
// Open a new file.
let segno = self.pos.segment_number(self.wal_seg_size);
let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
let wal_file_path = self.timeline_dir.join(wal_file_name);
Self::open_wal_file(&wal_file_path)?
}
};
let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize;
// How much to read and send in message? We cannot cross the WAL file
// boundary, and we don't want send more than provided buffer.
let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize;
let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
// Read some data from the file.
let buf = &mut buf[0..send_size];
let send_size = wal_segment.read_exact(buf).await?;
file.seek(SeekFrom::Start(xlogoff as u64))
.and_then(|_| file.read_exact(buf))
.context("Failed to read data from WAL file")?;
self.pos += send_size as u64;
// Decide whether to reuse this file. If we don't set wal_segment here
// a new reader will be opened next time.
// Decide whether to reuse this file. If we don't set wal_file here
// a new file will be opened next time.
if self.pos.segment_offset(self.wal_seg_size) != 0 {
self.wal_segment = Some(wal_segment);
self.file = Some(file);
}
Ok(send_size)
}
/// Open WAL segment at the current position of the reader.
async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead>>> {
let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize;
let segno = self.pos.segment_number(self.wal_seg_size);
let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
let wal_file_path = self.timeline_dir.join(wal_file_name);
// Try to open local file, if we may have WAL locally
if self.pos >= self.local_start_lsn {
let res = Self::open_wal_file(&wal_file_path).await;
match res {
Ok(mut file) => {
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
return Ok(Box::pin(file));
}
Err(e) => {
let is_not_found = e.chain().any(|e| {
if let Some(e) = e.downcast_ref::<io::Error>() {
e.kind() == io::ErrorKind::NotFound
} else {
false
}
});
if !is_not_found {
return Err(e);
}
// NotFound is expected, fall through to remote read
}
};
}
// Try to open remote file, if remote reads are enabled
if self.enable_remote_read {
let (reader, _) = read_object(wal_file_path, xlogoff as u64).await;
return Ok(Box::pin(reader));
}
bail!("WAL segment is not found")
}
/// Helper function for opening a wal file.
async fn open_wal_file(wal_file_path: &Path) -> Result<tokio::fs::File> {
fn open_wal_file(wal_file_path: &Path) -> Result<File> {
// First try to open the .partial file.
let mut partial_path = wal_file_path.to_owned();
partial_path.set_extension("partial");
if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await {
if let Ok(opened_file) = File::open(&partial_path) {
return Ok(opened_file);
}
// If that failed, try it without the .partial extension.
tokio::fs::File::open(&wal_file_path)
.await
File::open(&wal_file_path)
.with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
.map_err(|e| {
warn!("{}", e);
error!("{}", e);
e
})
}

View File

@@ -1,73 +0,0 @@
import subprocess
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres
from fixtures.neon_fixtures import pg_distrib_dir
import os
from fixtures.utils import mkdir_if_needed, subprocess_capture
import shutil
import getpass
import pwd
num_rows = 1000
# Ensure that regular postgres can start from fullbackup
def test_fullbackup(neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
port_distributor: PortDistributor):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_fullbackup')
pgmain = env.postgres.create_start('test_fullbackup')
log.info("postgres is running on 'test_fullbackup' branch")
timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0]
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g
from generate_series(1,{num_rows}) g''')
cur.execute("CHECKPOINT")
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
log.info(f"start_backup_lsn = {lsn}")
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')}
# Get and unpack fullbackup from pageserver
restored_dir_path = os.path.join(env.repo_dir, "restored_datadir")
os.mkdir(restored_dir_path, 0o750)
query = f"fullbackup {env.initial_tenant.hex} {timeline} {lsn}"
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
tar_output_file = result_basepath + ".stdout"
subprocess_capture(str(env.repo_dir), ["tar", "-xf", tar_output_file, "-C", restored_dir_path])
# HACK
# fullbackup returns neon specific pg_control and first WAL segment
# use resetwal to overwrite it
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, 'pg_resetwal')
cmd = [pg_resetwal_path, "-D", restored_dir_path]
pg_bin.run_capture(cmd, env=psql_env)
# Restore from the backup and find the data we inserted
port = port_distributor.get_port()
with VanillaPostgres(restored_dir_path, pg_bin, port, init=False) as vanilla_pg:
# TODO make port an optional argument
vanilla_pg.configure([
f"port={port}",
])
vanilla_pg.start()
num_rows_found = vanilla_pg.safe_psql('select count(*) from tbl;', user="cloud_admin")[0][0]
assert num_rows == num_rows_found

View File

@@ -1,193 +0,0 @@
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_upload, wait_for_last_record_lsn
from fixtures.utils import lsn_from_hex, lsn_to_hex
from uuid import UUID, uuid4
import tarfile
import os
import shutil
from pathlib import Path
import json
from fixtures.utils import subprocess_capture
from fixtures.log_helper import log
from contextlib import closing
from fixtures.neon_fixtures import pg_distrib_dir
@pytest.mark.timeout(600)
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
# Put data in vanilla pg
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
vanilla_pg.safe_psql('''create table t as select 'long string to consume some space' || g
from generate_series(1,300000) g''')
assert vanilla_pg.safe_psql('select count(*) from t') == [(300000, )]
# Take basebackup
basebackup_dir = os.path.join(test_output_dir, "basebackup")
base_tar = os.path.join(basebackup_dir, "base.tar")
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
os.mkdir(basebackup_dir)
vanilla_pg.safe_psql("CHECKPOINT")
pg_bin.run([
"pg_basebackup",
"-F",
"tar",
"-d",
vanilla_pg.connstr(),
"-D",
basebackup_dir,
])
# Make corrupt base tar with missing pg_control
unpacked_base = os.path.join(basebackup_dir, "unpacked-base")
corrupt_base_tar = os.path.join(unpacked_base, "corrupt-base.tar")
os.mkdir(unpacked_base, 0o750)
subprocess_capture(str(test_output_dir), ["tar", "-xf", base_tar, "-C", unpacked_base])
os.remove(os.path.join(unpacked_base, "global/pg_control"))
subprocess_capture(str(test_output_dir),
["tar", "-cf", "corrupt-base.tar"] + os.listdir(unpacked_base),
cwd=unpacked_base)
# Get start_lsn and end_lsn
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
manifest = json.load(f)
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
node_name = "import_from_vanilla"
tenant = uuid4()
timeline = uuid4()
# Set up pageserver for import
neon_env_builder.enable_local_fs_remote_storage()
env = neon_env_builder.init_start()
env.pageserver.http_client().tenant_create(tenant)
def import_tar(base, wal):
env.neon_cli.raw_cli([
"timeline",
"import",
"--tenant-id",
tenant.hex,
"--timeline-id",
timeline.hex,
"--node-name",
node_name,
"--base-lsn",
start_lsn,
"--base-tarfile",
base,
"--end-lsn",
end_lsn,
"--wal-tarfile",
wal,
])
# Importing corrupt backup fails
with pytest.raises(Exception):
import_tar(corrupt_base_tar, wal_tar)
# Clean up
# TODO it should clean itself
client = env.pageserver.http_client()
client.timeline_detach(tenant, timeline)
# Importing correct backup works
import_tar(base_tar, wal_tar)
# Wait for data to land in s3
wait_for_last_record_lsn(client, tenant, timeline, lsn_from_hex(end_lsn))
wait_for_upload(client, tenant, timeline, lsn_from_hex(end_lsn))
# Check it worked
pg = env.postgres.create_start(node_name, tenant_id=tenant)
assert pg.safe_psql('select count(*) from t') == [(300000, )]
@pytest.mark.timeout(600)
def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
num_rows = 3000
neon_env_builder.num_safekeepers = 1
neon_env_builder.enable_local_fs_remote_storage()
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_import_from_pageserver')
pgmain = env.postgres.create_start('test_import_from_pageserver')
log.info("postgres is running on 'test_import_from_pageserver' branch")
timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0]
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g
from generate_series(1,{num_rows}) g''')
cur.execute("CHECKPOINT")
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
log.info(f"start_backup_lsn = {lsn}")
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')}
# Get a fullbackup from pageserver
query = f"fullbackup { env.initial_tenant.hex} {timeline} {lsn}"
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
tar_output_file = result_basepath + ".stdout"
# Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / 'tenants'
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
#start the pageserver again
env.pageserver.start()
# Import using another tenantid, because we use the same pageserver.
# TODO Create another pageserver to maeke test more realistic.
tenant = uuid4()
# Import to pageserver
node_name = "import_from_pageserver"
client = env.pageserver.http_client()
client.tenant_create(tenant)
env.neon_cli.raw_cli([
"timeline",
"import",
"--tenant-id",
tenant.hex,
"--timeline-id",
timeline,
"--node-name",
node_name,
"--base-lsn",
lsn,
"--base-tarfile",
os.path.join(tar_output_file),
])
# Wait for data to land in s3
wait_for_last_record_lsn(client, tenant, UUID(timeline), lsn_from_hex(lsn))
wait_for_upload(client, tenant, UUID(timeline), lsn_from_hex(lsn))
# Check it worked
pg = env.postgres.create_start(node_name, tenant_id=tenant)
assert pg.safe_psql('select count(*) from tbl') == [(num_rows, )]
# Take another fullbackup
query = f"fullbackup { tenant.hex} {timeline} {lsn}"
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
new_tar_output_file = result_basepath + ".stdout"
# Check it's the same as the first fullbackup
# TODO pageserver should be checking checksum
assert os.path.getsize(tar_output_file) == os.path.getsize(new_tar_output_file)

View File

@@ -2,11 +2,14 @@ from contextlib import closing
from datetime import datetime
import os
import pytest
import time
from uuid import UUID
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.utils import lsn_to_hex
from fixtures.benchmark_fixture import MetricReport
@pytest.mark.parametrize('with_safekeepers', [False, True])
@@ -44,6 +47,56 @@ def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder, with_safekeepers:
assert cur.fetchone() == (5000050000, )
def test_tenant_threads(neon_env_builder, zenbenchmark):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
def get_num_threads() -> int:
metrics = env.pageserver.http_client().get_metrics()
parsed = parse_metrics(metrics)
threads = parsed.query_one("process_threads").value
return threads
threads_before = get_num_threads()
zenbenchmark.record("threads_before", threads_before, "", report=MetricReport.LOWER_IS_BETTER)
tenants = env.pageserver.http_client().tenant_list()
num_tenants = len(tenants)
num_active = len([t for t in tenants if t["state"] == "Active"])
zenbenchmark.record("tenants_before", num_tenants, "", report=MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("active_before", num_active, "", report=MetricReport.LOWER_IS_BETTER)
for i in range(20):
print(f"creating tenant {i}")
name = f"test_tenant_threads_{i}"
tenant, _ = env.neon_cli.create_tenant()
timeline = env.neon_cli.create_timeline(name, tenant_id=tenant)
pg = env.postgres.create_start(name, tenant_id=tenant)
pg.safe_psql("select 1;")
pg.stop()
env.pageserver.http_client().timeline_detach(tenant, timeline)
remaining_timelines = [
UUID(r["timeline_id"])
for r in env.pageserver.http_client().timeline_list(tenant)
]
for t in remaining_timelines:
env.pageserver.http_client().timeline_detach(tenant, t)
time.sleep(5)
threads_after = get_num_threads()
zenbenchmark.record("threads_before", threads_after, "", report=MetricReport.LOWER_IS_BETTER)
tenants = env.pageserver.http_client().tenant_list()
num_tenants = len(tenants)
num_active = len([t for t in tenants if t["state"] == "Active"])
zenbenchmark.record("tenants_after", num_tenants, "", report=MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("active_after", num_active, "", report=MetricReport.LOWER_IS_BETTER)
def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3

View File

@@ -2,7 +2,6 @@ import pytest
import random
import time
import os
import shutil
import signal
import subprocess
import sys
@@ -354,7 +353,7 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize('auth_enabled', [False, True])
def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.num_safekeepers = 2
# to advance remote_consistent_lsn
# to advance remote_consistent_llsn
neon_env_builder.enable_local_fs_remote_storage()
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
@@ -438,26 +437,6 @@ def wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end):
time.sleep(0.5)
def wait_wal_trim(tenant_id, timeline_id, sk, target_size):
started_at = time.time()
http_cli = sk.http_client()
while True:
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), tenant_id,
timeline_id)) / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size:.2f}MB status={tli_status}")
if sk_wal_size <= target_size:
break
elapsed = time.time() - started_at
if elapsed > 20:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for sk_id={sk.id} to trim WAL to {target_size:.2f}MB, current size is {sk_wal_size:.2f}MB"
)
time.sleep(0.5)
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str):
neon_env_builder.num_safekeepers = 3
@@ -506,116 +485,6 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str):
wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], '0/5000000')
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str):
neon_env_builder.num_safekeepers = 3
if storage_type == 'local_fs':
neon_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
neon_env_builder.enable_s3_mock_remote_storage('test_s3_wal_replay')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_s3_wal_replay')
env.pageserver.stop()
pageserver_tenants_dir = os.path.join(env.repo_dir, 'tenants')
pageserver_fresh_copy = os.path.join(env.repo_dir, 'tenants_fresh')
log.info(f"Creating a copy of pageserver in a fresh state at {pageserver_fresh_copy}")
shutil.copytree(pageserver_tenants_dir, pageserver_fresh_copy)
env.pageserver.start()
pg = env.postgres.create_start('test_s3_wal_replay')
# learn neon timeline from compute
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
expected_sum = 0
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t(key int, value text)")
cur.execute("insert into t values (1, 'payload')")
expected_sum += 1
offloaded_seg_end = ['0/3000000']
for seg_end in offloaded_seg_end:
# roughly fills two segments
cur.execute("insert into t select generate_series(1,500000), 'payload'")
expected_sum += 500000 * 500001 // 2
cur.execute("select sum(key) from t")
assert cur.fetchone()[0] == expected_sum
for sk in env.safekeepers:
wait_segment_offload(tenant_id, timeline_id, sk, seg_end)
# advance remote_consistent_lsn to trigger WAL trimming
# this LSN should be less than commit_lsn, so timeline will be active=true in safekeepers, to push etcd updates
env.safekeepers[0].http_client().record_safekeeper_info(
tenant_id, timeline_id, {'remote_consistent_lsn': offloaded_seg_end[-1]})
for sk in env.safekeepers:
# require WAL to be trimmed, so no more than one segment is left on disk
wait_wal_trim(tenant_id, timeline_id, sk, 16 * 1.5)
cur.execute('SELECT pg_current_wal_flush_lsn()')
last_lsn = cur.fetchone()[0]
pageserver_lsn = env.pageserver.http_client().timeline_detail(
uuid.UUID(tenant_id), uuid.UUID((timeline_id)))["local"]["last_record_lsn"]
lag = lsn_from_hex(last_lsn) - lsn_from_hex(pageserver_lsn)
log.info(
f'Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb'
)
# replace pageserver with a fresh copy
pg.stop_and_destroy()
env.pageserver.stop()
log.info(f'Removing current pageserver state at {pageserver_tenants_dir}')
shutil.rmtree(pageserver_tenants_dir)
log.info(f'Copying fresh pageserver state from {pageserver_fresh_copy}')
shutil.move(pageserver_fresh_copy, pageserver_tenants_dir)
# start pageserver and wait for replay
env.pageserver.start()
wait_lsn_timeout = 60 * 3
started_at = time.time()
last_debug_print = 0.0
while True:
elapsed = time.time() - started_at
if elapsed > wait_lsn_timeout:
raise RuntimeError(f'Timed out waiting for WAL redo')
pageserver_lsn = env.pageserver.http_client().timeline_detail(
uuid.UUID(tenant_id), uuid.UUID((timeline_id)))["local"]["last_record_lsn"]
lag = lsn_from_hex(last_lsn) - lsn_from_hex(pageserver_lsn)
if time.time() > last_debug_print + 10 or lag <= 0:
last_debug_print = time.time()
log.info(f'Pageserver last_record_lsn={pageserver_lsn}; lag is {lag / 1024}kb')
if lag <= 0:
break
time.sleep(1)
log.info(f'WAL redo took {elapsed} s')
# verify data
pg.create_start('test_s3_wal_replay')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("select sum(key) from t")
assert cur.fetchone()[0] == expected_sum
class ProposerPostgres(PgProtocol):
"""Object for running postgres without NeonEnv"""
def __init__(self,

View File

@@ -1373,13 +1373,12 @@ def pg_bin(test_output_dir: str) -> PgBin:
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int, init=True):
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int):
super().__init__(host='localhost', port=port, dbname='postgres')
self.pgdatadir = pgdatadir
self.pg_bin = pg_bin
self.running = False
if init:
self.pg_bin.run_capture(['initdb', '-D', pgdatadir])
self.pg_bin.run_capture(['initdb', '-D', pgdatadir])
self.configure([f"port = {port}\n"])
def configure(self, options: List[str]):
@@ -1395,12 +1394,12 @@ class VanillaPostgres(PgProtocol):
if log_path is None:
log_path = os.path.join(self.pgdatadir, "pg.log")
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', self.pgdatadir, '-l', log_path, 'start'])
self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, '-l', log_path, 'start'])
def stop(self):
assert self.running
self.running = False
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', self.pgdatadir, 'stop'])
self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, 'stop'])
def get_subdir_size(self, subdir) -> int:
"""Return size of pgdatadir subdirectory in bytes."""