zenith push

This commit is contained in:
Patrick Insinger
2021-06-01 19:21:44 -04:00
committed by Patrick Insinger
parent 3364a8d442
commit 47694ea4f5
3 changed files with 135 additions and 4 deletions

View File

@@ -20,12 +20,12 @@ use std::io::{BufReader, BufWriter, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::str::FromStr;
use std::thread;
use zenith_utils::lsn::Lsn;
use zenith_utils::{bin_ser::BeSer, lsn::Lsn};
use crate::basebackup;
use crate::branches;
use crate::page_cache;
use crate::repository::{BufferTag, RelTag};
use crate::repository::{BufferTag, RelTag, RelationUpdate, Update};
use crate::restore_local_repo;
use crate::walreceiver;
use crate::PageServerConf;
@@ -43,6 +43,7 @@ enum FeMessage {
Sync,
Terminate,
CopyData(Bytes),
CopyDone,
}
#[derive(Debug)]
@@ -61,6 +62,7 @@ enum BeMessage {
ControlFile,
CopyData(Bytes),
ErrorResponse(String),
CopyInResponse,
}
// Wrapped in libpq CopyData
@@ -354,6 +356,7 @@ impl FeMessage {
b'S' => Ok(Some(FeMessage::Sync)),
b'X' => Ok(Some(FeMessage::Terminate)),
b'd' => Ok(Some(FeMessage::CopyData(body))),
b'c' => Ok(Some(FeMessage::CopyDone)),
tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)),
}
}
@@ -604,6 +607,13 @@ impl Connection {
// Terminate fields
self.stream.write_u8(0)?;
}
BeMessage::CopyInResponse => {
self.stream.write_u8(b'G')?;
self.stream.write_u32::<BE>(4 + 1 + 2)?;
self.stream.write_u8(1)?; // binary
self.stream.write_u16::<BE>(0)?; // no columns
}
}
Ok(())
@@ -758,6 +768,96 @@ impl Connection {
self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&BeMessage::DataRow(Bytes::from(branch)))?;
self.write_message_noflush(&BeMessage::CommandComplete)?;
} else if query_string.starts_with(b"push ") {
let query_str = std::str::from_utf8(&query_string)?;
let mut it = query_str.split(' ');
it.next().unwrap();
let timeline_id: ZTimelineId = it
.next()
.ok_or_else(|| anyhow!("missing timeline id"))?
.parse()?;
let start_lsn = Lsn(0); // TODO this needs to come from the repo
let timeline =
page_cache::get_repository().create_empty_timeline(timeline_id, start_lsn)?;
self.write_message(&BeMessage::CopyInResponse)?;
let mut last_lsn = Lsn(0);
while let Some(msg) = self.read_message()? {
match msg {
FeMessage::CopyData(bytes) => {
let relation_update = RelationUpdate::des(&bytes)?;
last_lsn = relation_update.lsn;
match relation_update.update {
Update::Page { blknum, img } => {
let tag = BufferTag {
rel: relation_update.rel,
blknum,
};
timeline.put_page_image(tag, relation_update.lsn, img)?;
}
Update::WALRecord { blknum, rec } => {
let tag = BufferTag {
rel: relation_update.rel,
blknum,
};
timeline.put_wal_record(tag, rec)?;
}
Update::Truncate { n_blocks } => {
timeline.put_truncation(
relation_update.rel,
relation_update.lsn,
n_blocks,
)?;
}
Update::Unlink => {
todo!()
}
}
}
FeMessage::CopyDone => {
timeline.advance_last_valid_lsn(last_lsn);
break;
}
FeMessage::Sync => {}
_ => bail!("unexpected message {:?}", msg),
}
}
self.write_message(&BeMessage::CommandComplete)?;
} else if query_string.starts_with(b"request_push ") {
let query_str = std::str::from_utf8(&query_string)?;
let mut it = query_str.split(' ');
it.next().unwrap();
let timeline_id: ZTimelineId = it
.next()
.ok_or_else(|| anyhow!("missing timeline id"))?
.parse()?;
let timeline = page_cache::get_repository().get_timeline(timeline_id)?;
let postgres_connection_uri = it.next().ok_or(anyhow!("missing postgres uri"))?;
let mut conn = postgres::Client::connect(postgres_connection_uri, postgres::NoTls)?;
let mut copy_in = conn.copy_in(format!("push {}", timeline_id.to_string()).as_str())?;
let history = timeline.history()?;
for update_res in history {
let update = update_res?;
let update_bytes = update.ser()?;
copy_in.write_all(&update_bytes)?;
copy_in.flush()?; // ensure that messages are sent inside individual CopyData packets
}
copy_in.finish()?;
self.write_message(&BeMessage::CommandComplete)?;
} else if query_string.starts_with(b"branch_list") {
let branches = crate::branches::get_branches(&self.conf)?;
let branches_buf = serde_json::to_vec(&branches)?;

View File

@@ -101,14 +101,14 @@ pub trait History: Iterator<Item = Result<RelationUpdate>> {
fn lsn(&self) -> Lsn;
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RelationUpdate {
pub rel: RelTag,
pub lsn: Lsn,
pub update: Update,
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Update {
Page { blknum: u32, img: Bytes },
WALRecord { blknum: u32, rec: WALRecord },

View File

@@ -91,6 +91,12 @@ fn main() -> Result<()> {
),
),
)
.subcommand(
SubCommand::with_name("push")
.about("Push timeline to remote pageserver")
.arg(Arg::with_name("timeline").required(true))
.arg(Arg::with_name("remote").required(true)),
)
.get_matches();
// Create config file
@@ -171,6 +177,14 @@ fn main() -> Result<()> {
exit(1);
}
}
("push", Some(push_match)) => {
if let Err(e) = handle_push(push_match, &env) {
eprintln!("push operation failed: {}", e);
exit(1);
}
}
_ => {}
};
@@ -413,3 +427,20 @@ fn handle_remote(remote_match: &ArgMatches, local_env: &LocalEnv) -> Result<()>
Ok(())
}
fn handle_push(push_match: &ArgMatches, local_env: &LocalEnv) -> Result<()> {
let timeline_id_str = push_match.value_of("timeline").unwrap();
ZTimelineId::from_str(timeline_id_str)?;
let remote_name = push_match.value_of("remote").unwrap();
let remote = local_env
.remotes
.get(remote_name)
.ok_or_else(|| anyhow!("remote {} not found", remote_name))?;
let page_server = PageServerNode::from_env(local_env);
let mut client = page_server.page_server_psql_client()?;
client.simple_query(&format!("request_push {} {}", timeline_id_str, remote))?;
Ok(())
}