From 978ef167e0f545073dec010ba24d9bcc791608b2 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Fri, 10 Jun 2022 13:54:02 -0400 Subject: [PATCH] Test works now --- control_plane/src/storage.rs | 27 +++++++++------ pageserver/src/import_datadir.rs | 30 ++++++++++------- pageserver/src/page_service.rs | 58 +++++++++++++++++++++++++++----- 3 files changed, 84 insertions(+), 31 deletions(-) diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 1c17672880..9ef1a8a465 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -541,24 +541,31 @@ impl PageServerNode { // Init base reader let (start_lsn, base_tarfile_path) = base; let base_tarfile = File::open(base_tarfile_path)?; - let mut reader = BufReader::new(base_tarfile); + let mut base_reader = BufReader::new(base_tarfile); // If there's any wal, extend the reader and end_lsn - let end_lsn = if let Some((end_lsn, wal_tarfile_path)) = pg_wal { + let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal { let wal_tarfile = File::open(wal_tarfile_path)?; - (&mut reader).chain(BufReader::new(wal_tarfile)); - end_lsn + let wal_reader = BufReader::new(wal_tarfile); + (end_lsn, Some(wal_reader)) } else { - start_lsn + (start_lsn, None) }; - // Init writer - let import_cmd = format!("import {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); + // Import base + let import_cmd = format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); let mut writer = client.copy_in(&import_cmd)?; - - // Stream reader -> writer - io::copy(&mut reader, &mut writer)?; + io::copy(&mut base_reader, &mut writer)?; writer.finish()?; + + // Import wal + if let Some(mut wal_reader) = wal_reader { + let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); + let mut writer = client.copy_in(&import_cmd)?; + io::copy(&mut wal_reader, &mut writer)?; + writer.finish()?; + } + Ok(()) } } diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 399e325898..a6b856c7aa 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -430,20 +430,17 @@ fn import_wal( } -// Rest of file copied from https://github.com/neondatabase/neon/compare/WIP_import_from_tar - -pub fn import_timeline_from_tar( +pub fn import_basebackup_from_tar( tline: &mut DatadirTimeline, mut reader: Reader, base_lsn: Lsn, - end_lsn: Lsn, ) -> Result<()> { info!("importing base at {}", base_lsn); let mut modification = tline.begin_modification(base_lsn); modification.init_empty()?; // Import base - for base_tar_entry in tar::Archive::new(&mut reader).entries()? { + for base_tar_entry in tar::Archive::new(reader).entries()? { let mut entry = base_tar_entry.unwrap(); let header = entry.header(); let file_path = header.path().unwrap().into_owned(); @@ -469,17 +466,26 @@ pub fn import_timeline_from_tar( } modification.commit()?; + Ok(()) +} + +pub fn import_wal_from_tar( + tline: &mut DatadirTimeline, + mut reader: Reader, + start_lsn: Lsn, + end_lsn: Lsn, +) -> Result<()> { // Set up walingest mutable state - let mut waldecoder = WalStreamDecoder::new(base_lsn); - let mut segno = base_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE); - let mut offset = base_lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE); - let mut last_lsn = base_lsn; - let mut walingest = WalIngest::new(tline, base_lsn)?; + let mut waldecoder = WalStreamDecoder::new(start_lsn); + let mut segno = start_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE); + let mut offset = start_lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE); + let mut last_lsn = start_lsn; + let mut walingest = WalIngest::new(tline, start_lsn)?; // Ingest wal until end_lsn info!("importing wal until {}", end_lsn); - let mut pg_wal_tar = tar::Archive::new(&mut reader); + let mut pg_wal_tar = tar::Archive::new(reader); let mut pg_wal_entries_iter = pg_wal_tar.entries()?; while last_lsn <= end_lsn { let bytes = { @@ -523,7 +529,7 @@ pub fn import_timeline_from_tar( offset = 0; } - if last_lsn != base_lsn { + if last_lsn != start_lsn { info!("reached end of WAL at {}", last_lsn); } else { info!("there was no WAL to import at {}", last_lsn); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ada187b437..2c439f3589 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -31,7 +31,7 @@ use utils::{ use crate::basebackup; use crate::config::{PageServerConf, ProfilingConfig}; -use crate::import_datadir::{import_timeline_from_postgres_datadir, import_timeline_from_tar}; +use crate::import_datadir::{import_basebackup_from_tar, import_timeline_from_postgres_datadir, import_wal_from_tar}; use crate::layered_repository::LayeredRepository; use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp}; use crate::profiling::profpoint_start; @@ -529,7 +529,7 @@ impl PageServerHandler { Ok(()) } - fn handle_import( + fn handle_import_basebackup( &self, pgb: &mut PostgresBackend, tenant_id: ZTenantId, @@ -537,9 +537,7 @@ impl PageServerHandler { base_lsn: Lsn, end_lsn: Lsn, ) -> anyhow::Result<()> { - let _enter = info_span!("import", timeline = %timeline_id, tenant = %tenant_id).entered(); - // TODO cap the max number of import threads allowed - // TODO check that timeline doesn't exist already + let _enter = info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered(); // TODO thread_mgr::associate_with? // Create empty timeline @@ -550,11 +548,40 @@ impl PageServerHandler { let mut datadir_timeline = DatadirTimeline::::new( timeline, repartition_distance); + // TODO mark timeline as not ready until it reaches end_lsn + // Import basebackup provided via CopyData info!("importing basebackup"); pgb.write_message(&BeMessage::CopyInResponse)?; let reader = CopyInReader::new(pgb); - import_timeline_from_tar(&mut datadir_timeline, reader, base_lsn, end_lsn)?; + import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?; + + info!("done"); + Ok(()) + } + + fn handle_import_wal( + &self, + pgb: &mut PostgresBackend, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + start_lsn: Lsn, + end_lsn: Lsn, + ) -> anyhow::Result<()> { + let _enter = info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered(); + // TODO thread_mgr::associate_with? + + let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; + let timeline = repo.get_timeline_load(timeline_id)?; + let repartition_distance = repo.get_checkpoint_distance(); // TODO + let mut datadir_timeline = DatadirTimeline::::new( + timeline, repartition_distance); + + // Import wal provided via CopyData + info!("importing wal"); + pgb.write_message(&BeMessage::CopyInResponse)?; + let reader = CopyInReader::new(pgb); + import_wal_from_tar(&mut datadir_timeline, reader, start_lsn, end_lsn)?; info!("done"); Ok(()) @@ -836,8 +863,8 @@ impl postgres_backend::Handler for PageServerHandler { // Check that the timeline exists self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with("import ") { - let (_, params_raw) = query_string.split_at("import ".len()); + } else if query_string.starts_with("import basebackup ") { + let (_, params_raw) = query_string.split_at("import basebackup ".len()); let params = params_raw.split_whitespace().collect::>(); ensure!(params.len() == 4); let tenant = ZTenantId::from_str(params[0])?; @@ -847,7 +874,20 @@ impl postgres_backend::Handler for PageServerHandler { self.check_permission(Some(tenant))?; - self.handle_import(pgb, tenant, timeline, base_lsn, end_lsn)?; + self.handle_import_basebackup(pgb, tenant, timeline, base_lsn, end_lsn)?; + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } else if query_string.starts_with("import wal ") { + let (_, params_raw) = query_string.split_at("import wal ".len()); + let params = params_raw.split_whitespace().collect::>(); + ensure!(params.len() == 4); + let tenant = ZTenantId::from_str(params[0])?; + let timeline = ZTimelineId::from_str(params[1])?; + let start_lsn = Lsn::from_str(params[2])?; + let end_lsn = Lsn::from_str(params[2])?; + + self.check_permission(Some(tenant))?; + + self.handle_import_wal(pgb, tenant, timeline, start_lsn, end_lsn)?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.to_ascii_lowercase().starts_with("set ") { // important because psycopg2 executes "SET datestyle TO 'ISO'"