diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b81d4afb6b..4e6ee91399 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -20,12 +20,12 @@ use std::io::{BufReader, BufWriter, Read, Write}; use std::net::{TcpListener, TcpStream}; use std::str::FromStr; use std::thread; -use zenith_utils::lsn::Lsn; +use zenith_utils::{bin_ser::BeSer, lsn::Lsn}; use crate::basebackup; use crate::branches; use crate::page_cache; -use crate::repository::{BufferTag, RelTag}; +use crate::repository::{BufferTag, RelTag, RelationUpdate, Update}; use crate::restore_local_repo; use crate::walreceiver; use crate::PageServerConf; @@ -43,6 +43,7 @@ enum FeMessage { Sync, Terminate, CopyData(Bytes), + CopyDone, } #[derive(Debug)] @@ -61,6 +62,7 @@ enum BeMessage { ControlFile, CopyData(Bytes), ErrorResponse(String), + CopyInResponse, } // Wrapped in libpq CopyData @@ -354,6 +356,7 @@ impl FeMessage { b'S' => Ok(Some(FeMessage::Sync)), b'X' => Ok(Some(FeMessage::Terminate)), b'd' => Ok(Some(FeMessage::CopyData(body))), + b'c' => Ok(Some(FeMessage::CopyDone)), tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)), } } @@ -604,6 +607,13 @@ impl Connection { // Terminate fields self.stream.write_u8(0)?; } + + BeMessage::CopyInResponse => { + self.stream.write_u8(b'G')?; + self.stream.write_u32::(4 + 1 + 2)?; + self.stream.write_u8(1)?; // binary + self.stream.write_u16::(0)?; // no columns + } } Ok(()) @@ -758,6 +768,96 @@ impl Connection { self.write_message_noflush(&BeMessage::RowDescription)?; self.write_message_noflush(&BeMessage::DataRow(Bytes::from(branch)))?; self.write_message_noflush(&BeMessage::CommandComplete)?; + } else if query_string.starts_with(b"push ") { + let query_str = std::str::from_utf8(&query_string)?; + let mut it = query_str.split(' '); + it.next().unwrap(); + let timeline_id: ZTimelineId = it + .next() + .ok_or_else(|| anyhow!("missing timeline id"))? + .parse()?; + + let start_lsn = Lsn(0); // TODO this needs to come from the repo + let timeline = + page_cache::get_repository().create_empty_timeline(timeline_id, start_lsn)?; + + self.write_message(&BeMessage::CopyInResponse)?; + + let mut last_lsn = Lsn(0); + + while let Some(msg) = self.read_message()? { + match msg { + FeMessage::CopyData(bytes) => { + let relation_update = RelationUpdate::des(&bytes)?; + + last_lsn = relation_update.lsn; + + match relation_update.update { + Update::Page { blknum, img } => { + let tag = BufferTag { + rel: relation_update.rel, + blknum, + }; + + timeline.put_page_image(tag, relation_update.lsn, img)?; + } + Update::WALRecord { blknum, rec } => { + let tag = BufferTag { + rel: relation_update.rel, + blknum, + }; + + timeline.put_wal_record(tag, rec)?; + } + Update::Truncate { n_blocks } => { + timeline.put_truncation( + relation_update.rel, + relation_update.lsn, + n_blocks, + )?; + } + Update::Unlink => { + todo!() + } + } + } + FeMessage::CopyDone => { + timeline.advance_last_valid_lsn(last_lsn); + break; + } + FeMessage::Sync => {} + _ => bail!("unexpected message {:?}", msg), + } + } + + self.write_message(&BeMessage::CommandComplete)?; + } else if query_string.starts_with(b"request_push ") { + let query_str = std::str::from_utf8(&query_string)?; + let mut it = query_str.split(' '); + it.next().unwrap(); + + let timeline_id: ZTimelineId = it + .next() + .ok_or_else(|| anyhow!("missing timeline id"))? + .parse()?; + let timeline = page_cache::get_repository().get_timeline(timeline_id)?; + + let postgres_connection_uri = it.next().ok_or(anyhow!("missing postgres uri"))?; + + let mut conn = postgres::Client::connect(postgres_connection_uri, postgres::NoTls)?; + let mut copy_in = conn.copy_in(format!("push {}", timeline_id.to_string()).as_str())?; + + let history = timeline.history()?; + for update_res in history { + let update = update_res?; + let update_bytes = update.ser()?; + copy_in.write_all(&update_bytes)?; + copy_in.flush()?; // ensure that messages are sent inside individual CopyData packets + } + + copy_in.finish()?; + + self.write_message(&BeMessage::CommandComplete)?; } else if query_string.starts_with(b"branch_list") { let branches = crate::branches::get_branches(&self.conf)?; let branches_buf = serde_json::to_vec(&branches)?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index a420ff9783..6254dbf607 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -101,14 +101,14 @@ pub trait History: Iterator> { fn lsn(&self) -> Lsn; } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct RelationUpdate { pub rel: RelTag, pub lsn: Lsn, pub update: Update, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum Update { Page { blknum: u32, img: Bytes }, WALRecord { blknum: u32, rec: WALRecord }, diff --git a/zenith/src/main.rs b/zenith/src/main.rs index b4674413fb..79cd5ca75d 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -91,6 +91,12 @@ fn main() -> Result<()> { ), ), ) + .subcommand( + SubCommand::with_name("push") + .about("Push timeline to remote pageserver") + .arg(Arg::with_name("timeline").required(true)) + .arg(Arg::with_name("remote").required(true)), + ) .get_matches(); // Create config file @@ -171,6 +177,14 @@ fn main() -> Result<()> { exit(1); } } + + ("push", Some(push_match)) => { + if let Err(e) = handle_push(push_match, &env) { + eprintln!("push operation failed: {}", e); + exit(1); + } + } + _ => {} }; @@ -413,3 +427,20 @@ fn handle_remote(remote_match: &ArgMatches, local_env: &LocalEnv) -> Result<()> Ok(()) } + +fn handle_push(push_match: &ArgMatches, local_env: &LocalEnv) -> Result<()> { + let timeline_id_str = push_match.value_of("timeline").unwrap(); + ZTimelineId::from_str(timeline_id_str)?; + + let remote_name = push_match.value_of("remote").unwrap(); + let remote = local_env + .remotes + .get(remote_name) + .ok_or_else(|| anyhow!("remote {} not found", remote_name))?; + + let page_server = PageServerNode::from_env(local_env); + let mut client = page_server.page_server_psql_client()?; + client.simple_query(&format!("request_push {} {}", timeline_id_str, remote))?; + + Ok(()) +}