diff --git a/Cargo.lock b/Cargo.lock index 98812a9cdf..c615766eb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,15 +55,6 @@ dependencies = [ "backtrace", ] -[[package]] -name = "apis" -version = "0.1.0" -dependencies = [ - "anyhow", - "bytes", - "serde", -] - [[package]] name = "arrayvec" version = "0.4.12" @@ -1805,7 +1796,6 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", - "apis", "byteorder", "bytes", "chrono", diff --git a/libs/apis/Cargo.toml b/libs/apis/Cargo.toml deleted file mode 100644 index de30aa5815..0000000000 --- a/libs/apis/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "apis" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -bytes = { version = "1.0.1", features = ['serde'] } -serde = { version = "1.0", features = ["derive"] } -anyhow = "1.0" diff --git a/libs/apis/src/import.rs b/libs/apis/src/import.rs deleted file mode 100644 index 5abaf2b3ab..0000000000 --- a/libs/apis/src/import.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::path::PathBuf; - -use bytes::Bytes; -use serde::{Serialize, Deserialize}; - -#[derive(Serialize, Deserialize)] -pub enum ImportFeMessage { - // TODO chunk it - File(String, Bytes), - Done -} - -#[derive(Serialize, Deserialize)] -pub enum ImportBeMessage { - Done, - Error(String), -} - -// TODO add a thin binary wrapper for this function, aside from neon_local -pub fn send_basebackup(basebackup_dir: PathBuf) -> anyhow::Result<()> { - // TODO change return type - // TODO implement as sender of ImportFeMessage and receiver of ImportBeMessage - // on generic channel. - Ok(()) -} diff --git a/libs/apis/src/lib.rs b/libs/apis/src/lib.rs deleted file mode 100644 index 07fecca158..0000000000 --- a/libs/apis/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod import; diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 73542dc61b..298addb838 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -58,7 +58,6 @@ postgres_ffi = { path = "../libs/postgres_ffi" } etcd_broker = { path = "../libs/etcd_broker" } metrics = { path = "../libs/metrics" } utils = { path = "../libs/utils" } -apis = { path = "../libs/apis" } remote_storage = { path = "../libs/remote_storage" } workspace_hack = { version = "0.1", path = "../workspace_hack" } close_fds = "0.3.2" diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 9aad7e7203..288dde2195 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -21,7 +21,6 @@ use postgres_ffi::xlog_utils::*; use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED}; use postgres_ffi::{Oid, TransactionId}; use utils::lsn::Lsn; -use postgres::CopyOutReader; /// /// Import all relation data pages from local disk into the repository. @@ -406,6 +405,8 @@ fn import_wal( } +// Rest of file copied from https://github.com/neondatabase/neon/compare/WIP_import_from_tar + pub fn import_timeline_from_tar( tline: &mut DatadirTimeline, reader: Reader, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 12801f3298..8c7ef8e9aa 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -13,7 +13,8 @@ use anyhow::{bail, ensure, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use regex::Regex; -use std::io; +use std::collections::VecDeque; +use std::io::{self, Read}; use std::net::TcpListener; use std::str; use std::str::FromStr; @@ -26,8 +27,6 @@ use utils::{ pq_proto::{BeMessage, FeMessage, RowDescriptor, SINGLE_COL_ROWDESC}, zid::{ZTenantId, ZTimelineId}, }; -use apis::import::{ImportFeMessage, ImportBeMessage}; -use utils::bin_ser::BeSer; use crate::basebackup; use crate::config::{PageServerConf, ProfilingConfig}; @@ -202,6 +201,74 @@ impl PagestreamBeMessage { } } +/// Implements Read for the server side of CopyIn +struct CopyInReader<'a> { + pgb: &'a mut PostgresBackend, + + /// Overflow buffer for bytes sent in CopyData messages + /// that the reader (caller of read) hasn't asked for yet. + buf: VecDeque, +} + +impl<'a> CopyInReader<'a> { + // NOTE: pgb should be in copy in state already + fn new(pgb: &'a mut PostgresBackend) -> Self { + Self { + pgb, + buf: VecDeque::<_>::new(), + } + } +} + +impl<'a> Read for CopyInReader<'a> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + loop { + // TODO check if shutdown was requested? + + // Return from buffer if nonempty + if self.buf.len() > 0 { + let bytes_read = std::cmp::min(buf.len(), self.buf.len()); + for i in 0..bytes_read { + buf[i] = self.buf[i]; + } + self.buf.drain(0..bytes_read); + return Ok(bytes_read); + } + + // Wait for client to send CopyData bytes + match self.pgb.read_message() { + Ok(Some(message)) => { + let copy_data_bytes = match message { + FeMessage::CopyData(bytes) => bytes, + FeMessage::CopyDone => { + return Ok(0) + } + m => { + info!("unexpected copy in client message {:?}", m); + continue; + }, + }; + + // Return as much as we can, saving the rest in self.buf + let mut reader = copy_data_bytes.reader(); + let bytes_read = reader.read(buf)?; + self.buf.extend(reader.bytes().map(|r| r.expect("error reading from copy in"))); + return Ok(bytes_read); + }, + Ok(None) => { + // Is this ok? + return Ok(0) + }, + Err(e) => { + if !is_socket_read_timed_out(&e) { + todo!("return io::Error"); + } + }, + } + } + } +} + /////////////////////////////////////////////////////////////////////////////// /// @@ -458,43 +525,15 @@ impl PageServerHandler { let _enter = info_span!("import", timeline = %timeline, tenant = %tenant).entered(); // TODO cap the max number of import threads allowed // TODO check that timeline doesn't exist already + // TODO thread_mgr::associate_with? - // NOTE: pagerequests handler exits when connection is closed, - // so there is no need to reset the association - thread_mgr::associate_with(Some(tenant), Some(timeline)); - - // switch client to COPYBOTH pgb.write_message(&BeMessage::CopyInResponse)?; + let mut reader = CopyInReader::new(pgb); - while !thread_mgr::is_shutdown_requested() { - - // TODO implement this as reader object and pass it into import_timeline_from_tar - match pgb.read_message() { - Ok(Some(message)) => { - let copy_data_bytes = match message { - FeMessage::CopyData(bytes) => bytes, - FeMessage::CopyDone => { - return Ok(()) - } - m => { - info!("unexpected copy in client message {:?}", m); - continue; - }, - }; - - // TODO use copy_data_bytes - }, - Ok(None) => { - // Is this ok? - return Ok(()) - }, - Err(e) => { - if !is_socket_read_timed_out(&e) { - return Err(e); - } - }, - } - } + // TODO instead of draining, pass into timeline importer + let mut buf = Vec::::new(); + reader.read_to_end(&mut buf); + info!("Got {} bytes", buf.len()); Ok(()) }