Implement Read

This commit is contained in:
Bojan Serafimov
2022-06-08 18:40:44 -04:00
parent 3e42367603
commit 76b126be35
7 changed files with 78 additions and 86 deletions

10
Cargo.lock generated
View File

@@ -55,15 +55,6 @@ dependencies = [
"backtrace",
]
[[package]]
name = "apis"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"serde",
]
[[package]]
name = "arrayvec"
version = "0.4.12"
@@ -1805,7 +1796,6 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"apis",
"byteorder",
"bytes",
"chrono",

View File

@@ -1,11 +0,0 @@
[package]
name = "apis"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bytes = { version = "1.0.1", features = ['serde'] }
serde = { version = "1.0", features = ["derive"] }
anyhow = "1.0"

View File

@@ -1,25 +0,0 @@
use std::path::PathBuf;
use bytes::Bytes;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
pub enum ImportFeMessage {
// TODO chunk it
File(String, Bytes),
Done
}
#[derive(Serialize, Deserialize)]
pub enum ImportBeMessage {
Done,
Error(String),
}
// TODO add a thin binary wrapper for this function, aside from neon_local
pub fn send_basebackup(basebackup_dir: PathBuf) -> anyhow::Result<()> {
// TODO change return type
// TODO implement as sender of ImportFeMessage and receiver of ImportBeMessage
// on generic channel.
Ok(())
}

View File

@@ -1 +0,0 @@
pub mod import;

View File

@@ -58,7 +58,6 @@ postgres_ffi = { path = "../libs/postgres_ffi" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }
apis = { path = "../libs/apis" }
remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"

View File

@@ -21,7 +21,6 @@ use postgres_ffi::xlog_utils::*;
use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::{Oid, TransactionId};
use utils::lsn::Lsn;
use postgres::CopyOutReader;
///
/// Import all relation data pages from local disk into the repository.
@@ -406,6 +405,8 @@ fn import_wal<R: Repository>(
}
// Rest of file copied from https://github.com/neondatabase/neon/compare/WIP_import_from_tar
pub fn import_timeline_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
reader: Reader,

View File

@@ -13,7 +13,8 @@ use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use regex::Regex;
use std::io;
use std::collections::VecDeque;
use std::io::{self, Read};
use std::net::TcpListener;
use std::str;
use std::str::FromStr;
@@ -26,8 +27,6 @@ use utils::{
pq_proto::{BeMessage, FeMessage, RowDescriptor, SINGLE_COL_ROWDESC},
zid::{ZTenantId, ZTimelineId},
};
use apis::import::{ImportFeMessage, ImportBeMessage};
use utils::bin_ser::BeSer;
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
@@ -202,6 +201,74 @@ impl PagestreamBeMessage {
}
}
/// Implements Read for the server side of CopyIn
struct CopyInReader<'a> {
pgb: &'a mut PostgresBackend,
/// Overflow buffer for bytes sent in CopyData messages
/// that the reader (caller of read) hasn't asked for yet.
buf: VecDeque<u8>,
}
impl<'a> CopyInReader<'a> {
// NOTE: pgb should be in copy in state already
fn new(pgb: &'a mut PostgresBackend) -> Self {
Self {
pgb,
buf: VecDeque::<_>::new(),
}
}
}
impl<'a> Read for CopyInReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
// TODO check if shutdown was requested?
// Return from buffer if nonempty
if self.buf.len() > 0 {
let bytes_read = std::cmp::min(buf.len(), self.buf.len());
for i in 0..bytes_read {
buf[i] = self.buf[i];
}
self.buf.drain(0..bytes_read);
return Ok(bytes_read);
}
// Wait for client to send CopyData bytes
match self.pgb.read_message() {
Ok(Some(message)) => {
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
FeMessage::CopyDone => {
return Ok(0)
}
m => {
info!("unexpected copy in client message {:?}", m);
continue;
},
};
// Return as much as we can, saving the rest in self.buf
let mut reader = copy_data_bytes.reader();
let bytes_read = reader.read(buf)?;
self.buf.extend(reader.bytes().map(|r| r.expect("error reading from copy in")));
return Ok(bytes_read);
},
Ok(None) => {
// Is this ok?
return Ok(0)
},
Err(e) => {
if !is_socket_read_timed_out(&e) {
todo!("return io::Error");
}
},
}
}
}
}
///////////////////////////////////////////////////////////////////////////////
///
@@ -458,43 +525,15 @@ impl PageServerHandler {
let _enter = info_span!("import", timeline = %timeline, tenant = %tenant).entered();
// TODO cap the max number of import threads allowed
// TODO check that timeline doesn't exist already
// TODO thread_mgr::associate_with?
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
thread_mgr::associate_with(Some(tenant), Some(timeline));
// switch client to COPYBOTH
pgb.write_message(&BeMessage::CopyInResponse)?;
let mut reader = CopyInReader::new(pgb);
while !thread_mgr::is_shutdown_requested() {
// TODO implement this as reader object and pass it into import_timeline_from_tar
match pgb.read_message() {
Ok(Some(message)) => {
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
FeMessage::CopyDone => {
return Ok(())
}
m => {
info!("unexpected copy in client message {:?}", m);
continue;
},
};
// TODO use copy_data_bytes
},
Ok(None) => {
// Is this ok?
return Ok(())
},
Err(e) => {
if !is_socket_read_timed_out(&e) {
return Err(e);
}
},
}
}
// TODO instead of draining, pass into timeline importer
let mut buf = Vec::<u8>::new();
reader.read_to_end(&mut buf);
info!("Got {} bytes", buf.len());
Ok(())
}