stubs for smgr proto

This commit is contained in:
Stas Kelvich
2021-03-18 16:16:19 +03:00
parent ea38ed2239
commit a11ef162f7

View File

@@ -1,5 +1,12 @@
//
// The Page Service listens for client connections and serves their GetPage@LSN requests
// The Page Service listens for client connections and serves their GetPage@LSN
// requests.
//
// It is possible to connect here using usual psql/pgbench/libpq. Following
// commands are supported now:
// *status* -- show actual info about this pageserver,
// *pagestream* -- enter mode where smgr and pageserver talk with their
// custom protocol.
//
use tokio::net::{TcpListener, TcpStream};
@@ -12,14 +19,21 @@ use std::io::{self};
type Result<T> = std::result::Result<T, io::Error>;
///
/// Basic support for postgres backend protocol.
///
enum FeMessage {
StartupMessage(FeStartupMessage),
Query(FeQueryMessage),
Terminate
Terminate,
//
// All that messages are actually CopyData from libpq point of view.
//
ZenithExistsRequest(ZenithRequest),
ZenithTruncRequest(ZenithRequest),
ZenithUnlinkRequest(ZenithRequest),
ZenithNblocksRequest(ZenithRequest),
ZenithReadRequest(ZenithRequest),
ZenithCreateRequest(ZenithRequest),
ZenithExtendRequest(ZenithRequest),
}
enum BeMessage {
@@ -27,9 +41,37 @@ enum BeMessage {
ReadyForQuery,
RowDescription,
DataRow,
CommandComplete
CommandComplete,
//
// All that messages are actually CopyData from libpq point of view.
//
ZenithStatusResponse(ZenithStatusResponse),
ZenithNblocksResponse(ZenithStatusResponse),
ZenithReadResponse(ZenithReadResponse),
}
#[derive(Debug)]
struct ZenithRequest {
spc_node: i32,
db_node: i32,
rel_node: i32,
forknum: u8,
blkno: i32,
}
#[derive(Debug)]
struct ZenithStatusResponse {
ok: bool,
n_blocks: i32,
}
#[derive(Debug)]
struct ZenithReadResponse {
ok: bool,
n_blocks: i32,
page: Bytes
}
#[derive(Debug)]
struct FeStartupMessage {
@@ -74,7 +116,7 @@ impl FeStartupMessage {
};
buf.advance(len as usize);
Ok(Some(FeMessage::StartupMessage(FeStartupMessage{ version, kind})))
Ok(Some(FeMessage::StartupMessage(FeStartupMessage{version, kind})))
}
}
@@ -86,7 +128,7 @@ struct Buffer {
#[derive(Debug)]
struct FeQueryMessage {
body: Buffer
body: Bytes
}
impl FeMessage {
@@ -114,18 +156,42 @@ impl FeMessage {
return Ok(None);
}
let buf = Buffer {
bytes: buf.split_to(total_len).freeze(),
idx: 5,
};
let mut body = buf.split_to(total_len);
body.advance(5);
match tag {
b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage{body: buf}))),
b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage{body:body.freeze()}))),
b'X' => Ok(Some(FeMessage::Terminate)),
b'd' => {
let smgr_tag = body.get_u8();
let zreq = ZenithRequest {
spc_node: body.get_i32(),
db_node: body.get_i32(),
rel_node: body.get_i32(),
forknum: body.get_u8(),
blkno: body.get_i32(),
};
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
match smgr_tag {
0 => Ok(Some(FeMessage::ZenithExistsRequest(zreq))),
1 => Ok(Some(FeMessage::ZenithTruncRequest(zreq))),
2 => Ok(Some(FeMessage::ZenithUnlinkRequest(zreq))),
3 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))),
4 => Ok(Some(FeMessage::ZenithReadRequest(zreq))),
5 => Ok(Some(FeMessage::ZenithCreateRequest(zreq))),
6 => Ok(Some(FeMessage::ZenithExtendRequest(zreq))),
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown smgr message tag: {},'{:?}'", smgr_tag, buf),
))
}
},
tag => {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown message tag: {}", tag),
format!("unknown message tag: {},'{:?}'", tag, buf),
))
}
}
@@ -262,6 +328,22 @@ impl Connection {
self.stream.write_i32(4 + b.len() as i32).await?;
self.stream.write_buf(&mut b).await?;
}
BeMessage::ZenithStatusResponse(resp) |
BeMessage::ZenithNblocksResponse(resp) => {
self.stream.write_u8(b'd').await?;
self.stream.write_i32(4 + 1 + 4).await?;
self.stream.write_u8(resp.ok as u8).await?;
self.stream.write_i32(resp.n_blocks).await?;
}
BeMessage::ZenithReadResponse(resp) => {
self.stream.write_u8(b'd').await?;
self.stream.write_i32(4 + 1 + 4 + resp.page.len() as i32).await?;
self.stream.write_u8(resp.ok as u8).await?;
self.stream.write_i32(resp.n_blocks).await?;
self.stream.write_buf(&mut resp.page.clone()).await?;
}
}
Ok(())
@@ -295,10 +377,7 @@ impl Connection {
}
},
Some(FeMessage::Query(m)) => {
self.write_message_noflush(&BeMessage::RowDescription).await?;
self.write_message_noflush(&BeMessage::DataRow).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message(&BeMessage::ReadyForQuery).await?;
self.process_query(&m).await?;
},
Some(FeMessage::Terminate) => {
break;
@@ -307,12 +386,95 @@ impl Connection {
println!("connection closed");
break;
}
_ => {
return Err(io::Error::new(io::ErrorKind::Other,"unexpected message"));
}
}
}
Ok(())
}
async fn process_query(&mut self, q : &FeQueryMessage) -> Result<()> {
println!("got query {:?}", q.body);
if q.body.starts_with(b"pagestream") {
self.handle_pagerequests().await
} else if q.body.starts_with(b"status") {
self.write_message_noflush(&BeMessage::RowDescription).await?;
self.write_message_noflush(&BeMessage::DataRow).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message(&BeMessage::ReadyForQuery).await
} else {
self.write_message_noflush(&BeMessage::RowDescription).await?;
self.write_message_noflush(&BeMessage::DataRow).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message(&BeMessage::ReadyForQuery).await
}
}
async fn handle_pagerequests(&mut self) -> Result<()> {
/* switch client to COPYBOTH */
self.stream.write_u8(b'W').await?;
self.stream.write_i32(4 + 1 + 2).await?;
self.stream.write_u8(0).await?; /* copy_is_binary */
self.stream.write_i16(0).await?; /* numAttributes */
self.stream.flush().await?;
loop {
match self.read_message().await? {
Some(FeMessage::ZenithExistsRequest(_)) => {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
})).await?
}
Some(FeMessage::ZenithTruncRequest(_)) => {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
})).await?
}
Some(FeMessage::ZenithUnlinkRequest(_)) => {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
})).await?
}
Some(FeMessage::ZenithNblocksRequest(_)) => {
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
})).await?
}
Some(FeMessage::ZenithReadRequest(_)) => {
let zero_page = vec![0 as u8; 8192];
self.write_message(&BeMessage::ZenithReadResponse(ZenithReadResponse {
ok: true,
n_blocks: 0,
page: Bytes::from(zero_page),
})).await?
}
Some(FeMessage::ZenithCreateRequest(_)) => {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
})).await?
}
Some(FeMessage::ZenithExtendRequest(_)) => {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
})).await?
}
_ => {
}
}
}
}
}