From 2d8a19affaf210f307537db50fd012c5001ebdee Mon Sep 17 00:00:00 2001 From: anastasia Date: Thu, 1 Apr 2021 23:24:39 +0300 Subject: [PATCH] add protocol message to receive pg_control --- src/control_plane.rs | 16 ++++++++++++++++ src/lib.rs | 1 + src/page_service.rs | 24 +++++++++++++++++++++++- tests/test_pageserver.rs | 2 ++ 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/control_plane.rs b/src/control_plane.rs index c1ef135e1f..ebf01e5bbe 100644 --- a/src/control_plane.rs +++ b/src/control_plane.rs @@ -71,6 +71,22 @@ impl StorageControlPlane { } fn get_wal_acceptor_conn_info() {} + + + pub fn simple_query_storage(&self, db: &str, user: &str, sql: &str) -> Vec { + let connstring = format!( + "host={} port={} dbname={} user={}", + self.page_server_addr().ip(), + self.page_server_addr().port(), + db, + user + ); + + let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); + + println!("Running {}", sql); + client.simple_query(sql).unwrap() + } } pub struct PageServerNode { diff --git a/src/lib.rs b/src/lib.rs index e2fb1f967e..7b66b188de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; #[allow(dead_code)] pub mod control_plane; +pub mod controlfile; pub mod page_cache; pub mod page_service; pub mod wal_service; diff --git a/src/page_service.rs b/src/page_service.rs index 0c9c42b28e..761e78e841 100644 --- a/src/page_service.rs +++ b/src/page_service.rs @@ -48,6 +48,7 @@ enum BeMessage { RowDescription, DataRow, CommandComplete, + ControlFile, // // All that messages are actually CopyData from libpq point of view. @@ -334,6 +335,18 @@ impl Connection { self.stream.write_buf(&mut b).await?; } + BeMessage::ControlFile => { + // TODO pass checkpoint and xid info in this message + let mut b = Bytes::from("hello pg_control"); + + self.stream.write_u8(b'D').await?; + self.stream.write_i32(4 + 2 + 4 + b.len() as i32).await?; + + self.stream.write_i16(1).await?; + self.stream.write_i32(b.len() as i32).await?; + self.stream.write_buf(&mut b).await?; + } + BeMessage::CommandComplete => { let mut b = Bytes::from("SELECT 1\0"); @@ -422,7 +435,8 @@ impl Connection { if q.body.starts_with(b"pagestream") { self.handle_pagerequests().await - + } else if q.body.starts_with(b"controlfile") { + self.handle_controlfile().await } else if q.body.starts_with(b"status") { self.write_message_noflush(&BeMessage::RowDescription).await?; self.write_message_noflush(&BeMessage::DataRow).await?; @@ -437,6 +451,14 @@ impl Connection { } } + async fn handle_controlfile(&mut self) -> Result<()> { + self.write_message_noflush(&BeMessage::RowDescription).await?; + self.write_message_noflush(&BeMessage::ControlFile).await?; + self.write_message_noflush(&BeMessage::CommandComplete).await?; + self.write_message(&BeMessage::ReadyForQuery).await + + } + async fn handle_pagerequests(&mut self) -> Result<()> { /* switch client to COPYBOTH */ diff --git a/tests/test_pageserver.rs b/tests/test_pageserver.rs index c39de8d1c4..7a9bb26b93 100644 --- a/tests/test_pageserver.rs +++ b/tests/test_pageserver.rs @@ -24,6 +24,8 @@ fn test_redo_cases() { page_server_connstring = 'host={} port={}'\n\ ", pageserver_addr.ip(), pageserver_addr.port()).as_str()); + storage_cplane.simple_query_storage("postgres", node.whoami().as_str(), "controlfile"); + // start postgres node.start();