Test works now

This commit is contained in:
Bojan Serafimov
2022-06-10 13:54:02 -04:00
parent a568c49111
commit 978ef167e0
3 changed files with 84 additions and 31 deletions

View File

@@ -541,24 +541,31 @@ impl PageServerNode {
// Init base reader
let (start_lsn, base_tarfile_path) = base;
let base_tarfile = File::open(base_tarfile_path)?;
let mut reader = BufReader::new(base_tarfile);
let mut base_reader = BufReader::new(base_tarfile);
// If there's any wal, extend the reader and end_lsn
let end_lsn = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
let wal_tarfile = File::open(wal_tarfile_path)?;
(&mut reader).chain(BufReader::new(wal_tarfile));
end_lsn
let wal_reader = BufReader::new(wal_tarfile);
(end_lsn, Some(wal_reader))
} else {
start_lsn
(start_lsn, None)
};
// Init writer
let import_cmd = format!("import {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
// Import base
let import_cmd = format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
let mut writer = client.copy_in(&import_cmd)?;
// Stream reader -> writer
io::copy(&mut reader, &mut writer)?;
io::copy(&mut base_reader, &mut writer)?;
writer.finish()?;
// Import wal
if let Some(mut wal_reader) = wal_reader {
let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
let mut writer = client.copy_in(&import_cmd)?;
io::copy(&mut wal_reader, &mut writer)?;
writer.finish()?;
}
Ok(())
}
}

View File

@@ -430,20 +430,17 @@ fn import_wal<R: Repository>(
}
// Rest of file copied from https://github.com/neondatabase/neon/compare/WIP_import_from_tar
pub fn import_timeline_from_tar<R: Repository, Reader: Read>(
pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
mut reader: Reader,
base_lsn: Lsn,
end_lsn: Lsn,
) -> Result<()> {
info!("importing base at {}", base_lsn);
let mut modification = tline.begin_modification(base_lsn);
modification.init_empty()?;
// Import base
for base_tar_entry in tar::Archive::new(&mut reader).entries()? {
for base_tar_entry in tar::Archive::new(reader).entries()? {
let mut entry = base_tar_entry.unwrap();
let header = entry.header();
let file_path = header.path().unwrap().into_owned();
@@ -469,17 +466,26 @@ pub fn import_timeline_from_tar<R: Repository, Reader: Read>(
}
modification.commit()?;
Ok(())
}
pub fn import_wal_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
mut reader: Reader,
start_lsn: Lsn,
end_lsn: Lsn,
) -> Result<()> {
// Set up walingest mutable state
let mut waldecoder = WalStreamDecoder::new(base_lsn);
let mut segno = base_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = base_lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = base_lsn;
let mut walingest = WalIngest::new(tline, base_lsn)?;
let mut waldecoder = WalStreamDecoder::new(start_lsn);
let mut segno = start_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = start_lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = start_lsn;
let mut walingest = WalIngest::new(tline, start_lsn)?;
// Ingest wal until end_lsn
info!("importing wal until {}", end_lsn);
let mut pg_wal_tar = tar::Archive::new(&mut reader);
let mut pg_wal_tar = tar::Archive::new(reader);
let mut pg_wal_entries_iter = pg_wal_tar.entries()?;
while last_lsn <= end_lsn {
let bytes = {
@@ -523,7 +529,7 @@ pub fn import_timeline_from_tar<R: Repository, Reader: Read>(
offset = 0;
}
if last_lsn != base_lsn {
if last_lsn != start_lsn {
info!("reached end of WAL at {}", last_lsn);
} else {
info!("there was no WAL to import at {}", last_lsn);

View File

@@ -31,7 +31,7 @@ use utils::{
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::{import_timeline_from_postgres_datadir, import_timeline_from_tar};
use crate::import_datadir::{import_basebackup_from_tar, import_timeline_from_postgres_datadir, import_wal_from_tar};
use crate::layered_repository::LayeredRepository;
use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp};
use crate::profiling::profpoint_start;
@@ -529,7 +529,7 @@ impl PageServerHandler {
Ok(())
}
fn handle_import(
fn handle_import_basebackup(
&self,
pgb: &mut PostgresBackend,
tenant_id: ZTenantId,
@@ -537,9 +537,7 @@ impl PageServerHandler {
base_lsn: Lsn,
end_lsn: Lsn,
) -> anyhow::Result<()> {
let _enter = info_span!("import", timeline = %timeline_id, tenant = %tenant_id).entered();
// TODO cap the max number of import threads allowed
// TODO check that timeline doesn't exist already
let _enter = info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered();
// TODO thread_mgr::associate_with?
// Create empty timeline
@@ -550,11 +548,40 @@ impl PageServerHandler {
let mut datadir_timeline = DatadirTimeline::<LayeredRepository>::new(
timeline, repartition_distance);
// TODO mark timeline as not ready until it reaches end_lsn
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_timeline_from_tar(&mut datadir_timeline, reader, base_lsn, end_lsn)?;
import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?;
info!("done");
Ok(())
}
fn handle_import_wal(
&self,
pgb: &mut PostgresBackend,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
) -> anyhow::Result<()> {
let _enter = info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered();
// TODO thread_mgr::associate_with?
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.get_timeline_load(timeline_id)?;
let repartition_distance = repo.get_checkpoint_distance(); // TODO
let mut datadir_timeline = DatadirTimeline::<LayeredRepository>::new(
timeline, repartition_distance);
// Import wal provided via CopyData
info!("importing wal");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_wal_from_tar(&mut datadir_timeline, reader, start_lsn, end_lsn)?;
info!("done");
Ok(())
@@ -836,8 +863,8 @@ impl postgres_backend::Handler for PageServerHandler {
// Check that the timeline exists
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import ") {
let (_, params_raw) = query_string.split_at("import ".len());
} else if query_string.starts_with("import basebackup ") {
let (_, params_raw) = query_string.split_at("import basebackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(params.len() == 4);
let tenant = ZTenantId::from_str(params[0])?;
@@ -847,7 +874,20 @@ impl postgres_backend::Handler for PageServerHandler {
self.check_permission(Some(tenant))?;
self.handle_import(pgb, tenant, timeline, base_lsn, end_lsn)?;
self.handle_import_basebackup(pgb, tenant, timeline, base_lsn, end_lsn)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import wal ") {
let (_, params_raw) = query_string.split_at("import wal ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(params.len() == 4);
let tenant = ZTenantId::from_str(params[0])?;
let timeline = ZTimelineId::from_str(params[1])?;
let start_lsn = Lsn::from_str(params[2])?;
let end_lsn = Lsn::from_str(params[2])?;
self.check_permission(Some(tenant))?;
self.handle_import_wal(pgb, tenant, timeline, start_lsn, end_lsn)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'"