From a11ef162f7926be95bff2d835fb81f6d26b03abb Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Thu, 18 Mar 2021 16:16:19 +0300 Subject: [PATCH] stubs for smgr proto --- src/page_service.rs | 204 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 183 insertions(+), 21 deletions(-) diff --git a/src/page_service.rs b/src/page_service.rs index d2abf732df..57449c3710 100644 --- a/src/page_service.rs +++ b/src/page_service.rs @@ -1,5 +1,12 @@ // -// The Page Service listens for client connections and serves their GetPage@LSN requests +// The Page Service listens for client connections and serves their GetPage@LSN +// requests. +// +// It is possible to connect here using usual psql/pgbench/libpq. Following +// commands are supported now: +// *status* -- show actual info about this pageserver, +// *pagestream* -- enter mode where smgr and pageserver talk with their +// custom protocol. // use tokio::net::{TcpListener, TcpStream}; @@ -12,14 +19,21 @@ use std::io::{self}; type Result = std::result::Result; -/// -/// Basic support for postgres backend protocol. -/// - enum FeMessage { StartupMessage(FeStartupMessage), Query(FeQueryMessage), - Terminate + Terminate, + + // + // All that messages are actually CopyData from libpq point of view. + // + ZenithExistsRequest(ZenithRequest), + ZenithTruncRequest(ZenithRequest), + ZenithUnlinkRequest(ZenithRequest), + ZenithNblocksRequest(ZenithRequest), + ZenithReadRequest(ZenithRequest), + ZenithCreateRequest(ZenithRequest), + ZenithExtendRequest(ZenithRequest), } enum BeMessage { @@ -27,9 +41,37 @@ enum BeMessage { ReadyForQuery, RowDescription, DataRow, - CommandComplete + CommandComplete, + + // + // All that messages are actually CopyData from libpq point of view. + // + ZenithStatusResponse(ZenithStatusResponse), + ZenithNblocksResponse(ZenithStatusResponse), + ZenithReadResponse(ZenithReadResponse), } +#[derive(Debug)] +struct ZenithRequest { + spc_node: i32, + db_node: i32, + rel_node: i32, + forknum: u8, + blkno: i32, +} + +#[derive(Debug)] +struct ZenithStatusResponse { + ok: bool, + n_blocks: i32, +} + +#[derive(Debug)] +struct ZenithReadResponse { + ok: bool, + n_blocks: i32, + page: Bytes +} #[derive(Debug)] struct FeStartupMessage { @@ -74,7 +116,7 @@ impl FeStartupMessage { }; buf.advance(len as usize); - Ok(Some(FeMessage::StartupMessage(FeStartupMessage{ version, kind}))) + Ok(Some(FeMessage::StartupMessage(FeStartupMessage{version, kind}))) } } @@ -86,7 +128,7 @@ struct Buffer { #[derive(Debug)] struct FeQueryMessage { - body: Buffer + body: Bytes } impl FeMessage { @@ -114,18 +156,42 @@ impl FeMessage { return Ok(None); } - let buf = Buffer { - bytes: buf.split_to(total_len).freeze(), - idx: 5, - }; + let mut body = buf.split_to(total_len); + body.advance(5); match tag { - b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage{body: buf}))), + b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage{body:body.freeze()}))), b'X' => Ok(Some(FeMessage::Terminate)), + b'd' => { + let smgr_tag = body.get_u8(); + let zreq = ZenithRequest { + spc_node: body.get_i32(), + db_node: body.get_i32(), + rel_node: body.get_i32(), + forknum: body.get_u8(), + blkno: body.get_i32(), + }; + + // TODO: consider using protobuf or serde bincode for less error prone + // serialization. + match smgr_tag { + 0 => Ok(Some(FeMessage::ZenithExistsRequest(zreq))), + 1 => Ok(Some(FeMessage::ZenithTruncRequest(zreq))), + 2 => Ok(Some(FeMessage::ZenithUnlinkRequest(zreq))), + 3 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))), + 4 => Ok(Some(FeMessage::ZenithReadRequest(zreq))), + 5 => Ok(Some(FeMessage::ZenithCreateRequest(zreq))), + 6 => Ok(Some(FeMessage::ZenithExtendRequest(zreq))), + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("unknown smgr message tag: {},'{:?}'", smgr_tag, buf), + )) + } + }, tag => { Err(io::Error::new( io::ErrorKind::InvalidInput, - format!("unknown message tag: {}", tag), + format!("unknown message tag: {},'{:?}'", tag, buf), )) } } @@ -262,6 +328,22 @@ impl Connection { self.stream.write_i32(4 + b.len() as i32).await?; self.stream.write_buf(&mut b).await?; } + + BeMessage::ZenithStatusResponse(resp) | + BeMessage::ZenithNblocksResponse(resp) => { + self.stream.write_u8(b'd').await?; + self.stream.write_i32(4 + 1 + 4).await?; + self.stream.write_u8(resp.ok as u8).await?; + self.stream.write_i32(resp.n_blocks).await?; + } + + BeMessage::ZenithReadResponse(resp) => { + self.stream.write_u8(b'd').await?; + self.stream.write_i32(4 + 1 + 4 + resp.page.len() as i32).await?; + self.stream.write_u8(resp.ok as u8).await?; + self.stream.write_i32(resp.n_blocks).await?; + self.stream.write_buf(&mut resp.page.clone()).await?; + } } Ok(()) @@ -295,10 +377,7 @@ impl Connection { } }, Some(FeMessage::Query(m)) => { - self.write_message_noflush(&BeMessage::RowDescription).await?; - self.write_message_noflush(&BeMessage::DataRow).await?; - self.write_message_noflush(&BeMessage::CommandComplete).await?; - self.write_message(&BeMessage::ReadyForQuery).await?; + self.process_query(&m).await?; }, Some(FeMessage::Terminate) => { break; @@ -307,12 +386,95 @@ impl Connection { println!("connection closed"); break; } + _ => { + return Err(io::Error::new(io::ErrorKind::Other,"unexpected message")); + } } } Ok(()) } + async fn process_query(&mut self, q : &FeQueryMessage) -> Result<()> { + println!("got query {:?}", q.body); + + if q.body.starts_with(b"pagestream") { + self.handle_pagerequests().await + + } else if q.body.starts_with(b"status") { + self.write_message_noflush(&BeMessage::RowDescription).await?; + self.write_message_noflush(&BeMessage::DataRow).await?; + self.write_message_noflush(&BeMessage::CommandComplete).await?; + self.write_message(&BeMessage::ReadyForQuery).await + + } else { + self.write_message_noflush(&BeMessage::RowDescription).await?; + self.write_message_noflush(&BeMessage::DataRow).await?; + self.write_message_noflush(&BeMessage::CommandComplete).await?; + self.write_message(&BeMessage::ReadyForQuery).await + } + } + + async fn handle_pagerequests(&mut self) -> Result<()> { + + /* switch client to COPYBOTH */ + self.stream.write_u8(b'W').await?; + self.stream.write_i32(4 + 1 + 2).await?; + self.stream.write_u8(0).await?; /* copy_is_binary */ + self.stream.write_i16(0).await?; /* numAttributes */ + self.stream.flush().await?; + + loop { + match self.read_message().await? { + Some(FeMessage::ZenithExistsRequest(_)) => { + self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { + ok: true, + n_blocks: 0 + })).await? + } + Some(FeMessage::ZenithTruncRequest(_)) => { + self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { + ok: true, + n_blocks: 0 + })).await? + } + Some(FeMessage::ZenithUnlinkRequest(_)) => { + self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { + ok: true, + n_blocks: 0 + })).await? + } + Some(FeMessage::ZenithNblocksRequest(_)) => { + self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse { + ok: true, + n_blocks: 0 + })).await? + } + Some(FeMessage::ZenithReadRequest(_)) => { + let zero_page = vec![0 as u8; 8192]; + self.write_message(&BeMessage::ZenithReadResponse(ZenithReadResponse { + ok: true, + n_blocks: 0, + page: Bytes::from(zero_page), + })).await? + } + Some(FeMessage::ZenithCreateRequest(_)) => { + self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { + ok: true, + n_blocks: 0 + })).await? + } + Some(FeMessage::ZenithExtendRequest(_)) => { + self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { + ok: true, + n_blocks: 0 + })).await? + } + _ => { + + } + } + } + + } } - -