diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 9ef1a8a465..8ef01d7116 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::fs::File; -use std::io::{BufReader, Read, Write}; +use std::io::{BufReader, Write}; use std::net::TcpStream; use std::num::NonZeroU64; use std::path::PathBuf; @@ -553,7 +553,8 @@ impl PageServerNode { }; // Import base - let import_cmd = format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); + let import_cmd = + format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); let mut writer = client.copy_in(&import_cmd)?; io::copy(&mut base_reader, &mut writer)?; writer.finish()?; diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index 253d3ccd1a..35e2d9c9e2 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -626,26 +626,32 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - "Created timeline '{}' at Lsn {} for tenant: {}", timeline.timeline_id, last_record_lsn, tenant_id, ); - }, + } Some(("import", import_match)) => { let tenant_id = get_tenant_id(import_match, env)?; - let timeline_id = parse_timeline_id(import_match)? - .expect("No timeline id provided"); - let name = import_match.value_of("node-name") + let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided"); + let name = import_match + .value_of("node-name") .ok_or_else(|| anyhow!("No node name provided"))?; // Parse base inputs - let base_tarfile = import_match.value_of("base-tarfile") + let base_tarfile = import_match + .value_of("base-tarfile") .map(|s| PathBuf::from_str(s).unwrap()) .ok_or_else(|| anyhow!("No base-tarfile provided"))?; - let base_lsn = Lsn::from_str(import_match.value_of("base-lsn") - .ok_or_else(|| anyhow!("No base-lsn provided"))?)?; + let base_lsn = Lsn::from_str( + import_match + .value_of("base-lsn") + .ok_or_else(|| anyhow!("No base-lsn provided"))?, + )?; let base = (base_lsn, base_tarfile); // Parse pg_wal inputs - let wal_tarfile = import_match.value_of("wal-tarfile") + let wal_tarfile = import_match + .value_of("wal-tarfile") .map(|s| PathBuf::from_str(s).unwrap()); - let end_lsn = import_match.value_of("end-lsn") + let end_lsn = import_match + .value_of("end-lsn") .map(|s| Lsn::from_str(s).unwrap()); // TODO validate both or none are provided let pg_wal = end_lsn.zip(wal_tarfile); @@ -657,7 +663,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; cplane.new_node(tenant_id, name, timeline_id, None, None)?; println!("Done"); - }, + } Some(("branch", branch_match)) => { let tenant_id = get_tenant_id(branch_match, env)?; let new_branch_name = branch_match diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 67cbfe3e59..08639ea2ab 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -6,10 +6,9 @@ use std::fs; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::path::{Path, PathBuf}; -use std::str::FromStr; use anyhow::{bail, ensure, Context, Result}; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use tracing::*; use crate::pgdatadir_mapping::*; @@ -159,7 +158,6 @@ pub fn import_timeline_from_postgres_datadir( Ok(()) } - fn import_relfile( modification: &mut DatadirModification, path: &Path, @@ -429,7 +427,6 @@ fn import_wal( Ok(()) } - pub fn import_basebackup_from_tar( tline: &mut DatadirTimeline, reader: Reader, @@ -441,7 +438,7 @@ pub fn import_basebackup_from_tar( // Import base for base_tar_entry in tar::Archive::new(reader).entries()? { - let mut entry = base_tar_entry.unwrap(); + let entry = base_tar_entry.unwrap(); let header = entry.header(); let len = header.entry_size()? as usize; let file_path = header.path().unwrap().into_owned(); @@ -451,17 +448,17 @@ pub fn import_basebackup_from_tar( // let mut buffer = Vec::new(); // entry.read_to_end(&mut buffer).unwrap(); - import_file(&mut modification, &file_path.as_ref(), entry, len)?; - }, + import_file(&mut modification, file_path.as_ref(), entry, len)?; + } tar::EntryType::Directory => { info!("directory {:?}", file_path); if file_path.starts_with("pg_wal") { info!("found pg_wal in base lol"); } - }, + } _ => { panic!("tar::EntryType::?? {}", file_path.display()); - }, + } } } @@ -475,7 +472,6 @@ pub fn import_wal_from_tar( start_lsn: Lsn, end_lsn: Lsn, ) -> Result<()> { - // Set up walingest mutable state let mut waldecoder = WalStreamDecoder::new(start_lsn); let mut segno = start_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE); @@ -579,7 +575,7 @@ pub fn import_file( } "PG_VERSION" => { info!("ignored"); - }, + } _ => { import_rel(modification, file_path, spcnode, dbnode, reader, len)?; info!("imported rel creation"); @@ -589,8 +585,7 @@ pub fn import_file( let spcnode = pg_constants::DEFAULTTABLESPACE_OID; let dbnode: u32 = file_path .iter() - .skip(1) - .next() + .nth(1) .unwrap() .to_string_lossy() .parse() @@ -604,7 +599,7 @@ pub fn import_file( } "PG_VERSION" => { info!("ignored"); - }, + } _ => { import_rel(modification, file_path, spcnode, dbnode, reader, len)?; info!("imported rel creation"); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ef125df4c4..7341bab1cc 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -13,10 +13,8 @@ use anyhow::{bail, ensure, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use regex::Regex; -use std::collections::VecDeque; use std::io::{self, Read}; use std::net::TcpListener; -use std::path::PathBuf; use std::str; use std::str::FromStr; use std::sync::{Arc, RwLockReadGuard}; @@ -31,7 +29,7 @@ use utils::{ use crate::basebackup; use crate::config::{PageServerConf, ProfilingConfig}; -use crate::import_datadir::{import_basebackup_from_tar, import_timeline_from_postgres_datadir, import_wal_from_tar}; +use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar}; use crate::layered_repository::LayeredRepository; use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp}; use crate::profiling::profpoint_start; @@ -42,7 +40,6 @@ use crate::tenant_mgr; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; use crate::CheckpointConfig; -use crate::timelines::create_timeline; use metrics::{register_histogram_vec, HistogramVec}; use postgres_ffi::xlog_utils::to_pg_timestamp; @@ -211,7 +208,14 @@ struct CopyInReader<'a> { /// Overflow buffer for bytes sent in CopyData messages /// that the reader (caller of read) hasn't asked for yet. - buf: VecDeque, + /// TODO use BytesMut? + buf: Vec, + + /// Bytes before `buf_begin` are considered as dropped. + /// This allows us to implement O(1) pop_front on Vec. + /// The Vec won't grow large because we only add to it + /// when it's empty. + buf_begin: usize, } impl<'a> CopyInReader<'a> { @@ -219,7 +223,8 @@ impl<'a> CopyInReader<'a> { fn new(pgb: &'a mut PostgresBackend) -> Self { Self { pgb, - buf: VecDeque::<_>::new(), + buf: Vec::<_>::new(), + buf_begin: 0, } } } @@ -239,44 +244,44 @@ impl<'a> Read for CopyInReader<'a> { // TODO check if shutdown was requested? // Return from buffer if nonempty - if self.buf.len() > 0 { - let bytes_read = std::cmp::min(buf.len(), self.buf.len()); - for i in 0..bytes_read { - buf[i] = self.buf[i]; - } - self.buf.drain(0..bytes_read); - return Ok(bytes_read); + if self.buf_begin < self.buf.len() { + let bytes_to_read = std::cmp::min(buf.len(), self.buf.len() - self.buf_begin); + buf[..bytes_to_read].copy_from_slice(&self.buf[self.buf_begin..][..bytes_to_read]); + self.buf_begin += bytes_to_read; + return Ok(bytes_to_read); } + // Delete garbage + self.buf.clear(); + self.buf_begin = 0; + // Wait for client to send CopyData bytes match self.pgb.read_message() { Ok(Some(message)) => { let copy_data_bytes = match message { FeMessage::CopyData(bytes) => bytes, - FeMessage::CopyDone => { - return Ok(0) - } + FeMessage::CopyDone => return Ok(0), m => { info!("unexpected copy in client message {:?}", m); continue; - }, + } }; // Return as much as we can, saving the rest in self.buf let mut reader = copy_data_bytes.reader(); let bytes_read = reader.read(buf)?; - self.buf.extend(reader.bytes().map(|r| r.expect("error reading from copy in"))); + reader.read_to_end(&mut self.buf)?; return Ok(bytes_read); - }, + } Ok(None) => { // Is this ok? - return Ok(0) - }, + return Ok(0); + } Err(e) => { if !is_socket_read_timed_out(&e) { todo!("return io::Error"); } - }, + } } } } @@ -535,20 +540,21 @@ impl PageServerHandler { tenant_id: ZTenantId, timeline_id: ZTimelineId, base_lsn: Lsn, - end_lsn: Lsn, + _end_lsn: Lsn, ) -> anyhow::Result<()> { - let _enter = info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered(); + let _enter = + info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered(); // TODO thread_mgr::associate_with? // Create empty timeline info!("creating new timeline"); let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; let timeline = repo.create_empty_timeline(timeline_id, Lsn(0))?; - let repartition_distance = repo.get_checkpoint_distance(); // TODO - let mut datadir_timeline = DatadirTimeline::::new( - timeline, repartition_distance); + let repartition_distance = repo.get_checkpoint_distance(); // TODO + let mut datadir_timeline = + DatadirTimeline::::new(timeline, repartition_distance); - // TODO mark timeline as not ready until it reaches end_lsn + // TODO mark timeline as not ready until it reaches end_lsn? // Import basebackup provided via CopyData info!("importing basebackup"); @@ -568,14 +574,15 @@ impl PageServerHandler { start_lsn: Lsn, end_lsn: Lsn, ) -> anyhow::Result<()> { - let _enter = info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered(); + let _enter = + info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered(); // TODO thread_mgr::associate_with? let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; let timeline = repo.get_timeline_load(timeline_id)?; - let repartition_distance = repo.get_checkpoint_distance(); // TODO - let mut datadir_timeline = DatadirTimeline::::new( - timeline, repartition_distance); + let repartition_distance = repo.get_checkpoint_distance(); // TODO + let mut datadir_timeline = + DatadirTimeline::::new(timeline, repartition_distance); // TODO ensure start_lsn matches current lsn