diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index edfa911abb..ca33a857b8 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -1,7 +1,7 @@ //! Part of Safekeeper pretending to be Postgres, i.e. handling Postgres //! protocol commands. -use anyhow::Context; +use anyhow::{bail, Context}; use std::str; use tracing::{info, info_span, Instrument}; @@ -37,9 +37,11 @@ enum SafekeeperPostgresCommand { StartReplication { start_lsn: Lsn }, IdentifySystem, JSONCtrl { cmd: AppendLogicalMessage }, + Show { guc: String }, } fn parse_cmd(cmd: &str) -> anyhow::Result { + let cmd_lowercase = cmd.to_ascii_lowercase(); if cmd.starts_with("START_WAL_PUSH") { Ok(SafekeeperPostgresCommand::StartWalPush) } else if cmd.starts_with("START_REPLICATION") { @@ -49,7 +51,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result { let start_lsn = caps .next() .map(|cap| cap[1].parse::()) - .context("failed to parse start LSN from START_REPLICATION command")??; + .context("parse start LSN from START_REPLICATION command")??; Ok(SafekeeperPostgresCommand::StartReplication { start_lsn }) } else if cmd.starts_with("IDENTIFY_SYSTEM") { Ok(SafekeeperPostgresCommand::IdentifySystem) @@ -58,6 +60,14 @@ fn parse_cmd(cmd: &str) -> anyhow::Result { Ok(SafekeeperPostgresCommand::JSONCtrl { cmd: serde_json::from_str(cmd)?, }) + } else if cmd_lowercase.starts_with("show") { + let re = Regex::new(r"show ((?:[[:alpha:]]|_)+)").unwrap(); + let mut caps = re.captures_iter(&cmd_lowercase); + let guc = caps + .next() + .map(|cap| cap[1].parse::()) + .context("parse guc in SHOW command")??; + Ok(SafekeeperPostgresCommand::Show { guc }) } else { anyhow::bail!("unsupported command {cmd}"); } @@ -148,10 +158,7 @@ impl postgres_backend_async::Handler for SafekeeperPostgresHandler { .await?; return Ok(()); } - info!( - "got unparsed query {:?} in timeline {:?}", - query_string, self.timeline_id - ); + let cmd = parse_cmd(query_string)?; info!( @@ -174,9 +181,11 @@ impl postgres_backend_async::Handler for SafekeeperPostgresHandler { .await } SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await, + SafekeeperPostgresCommand::Show { guc } => self.handle_show(guc, pgb).await, SafekeeperPostgresCommand::JSONCtrl { ref cmd } => { handle_json_ctrl(self, pgb, cmd).await } + _ => unreachable!(), }; match res { @@ -283,6 +292,40 @@ impl SafekeeperPostgresHandler { Ok(()) } + async fn handle_show( + &mut self, + guc: String, + pgb: &mut PostgresBackend, + ) -> Result<(), QueryError> { + match guc.as_str() { + // pg_receivewal wants it + "data_directory_mode" => { + pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::int8_col( + b"data_directory_mode", + )]))? + // xxx we could return real one, not just 0700 + .write_message(&BeMessage::DataRow(&[Some(0700.to_string().as_bytes())]))? + .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; + } + // pg_receivewal wants it + "wal_segment_size" => { + let tli = GlobalTimelines::get(self.ttid)?; + let wal_seg_size = tli.get_state().1.server.wal_seg_size; + let wal_seg_size_mb = (wal_seg_size / 1024 / 1024).to_string() + "MB"; + + pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::text_col( + b"wal_segment_size", + )]))? + .write_message(&BeMessage::DataRow(&[Some(wal_seg_size_mb.as_bytes())]))? + .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; + } + _ => { + return Err(anyhow::anyhow!("SHOW of unknown setting").into()); + } + } + Ok(()) + } + /// Returns true if current connection is a replication connection, originating /// from a walproposer recovery function. This connection gets a special handling: /// safekeeper must stream all local WAL till the flush_lsn, whether committed or not. diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index a917d61678..7d81602dcd 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -8,11 +8,14 @@ use serde::Serialize; use serde::Serializer; use std::collections::{HashMap, HashSet}; use std::fmt::Display; +use std::str::FromStr; use std::sync::Arc; use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use tokio::task::JoinError; +use crate::json_ctrl::append_logical_message; +use crate::json_ctrl::AppendLogicalMessage; use crate::safekeeper::ServerInfo; use crate::safekeeper::Term; @@ -191,6 +194,50 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result, ApiError> { + let ttid = TenantTimelineId { + tenant_id: TenantId::from_str("deadbeefdeadbeefdeadbeefdeadbeef") + .expect("timeline_id parsing failed"), + timeline_id: TimelineId::from_str("deadbeefdeadbeefdeadbeefdeadbeef") + .expect("tenant_id parsing failed"), + }; + let pg_version = 150000; + let server_info = ServerInfo { + pg_version, + system_id: 0, + wal_seg_size: WAL_SEGMENT_SIZE as u32, + }; + let init_lsn = Lsn(0x1493AC8); + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + let tli = GlobalTimelines::create(ttid, server_info, init_lsn, init_lsn)?; + let mut begin_lsn = init_lsn; + for _ in 0..16 { + let append = AppendLogicalMessage { + lm_prefix: "db".to_owned(), + lm_message: "hahabubu".to_owned(), + set_commit_lsn: true, + send_proposer_elected: false, // actually ignored here + term: 0, + epoch_start_lsn: init_lsn, + begin_lsn, + truncate_lsn: init_lsn, + pg_version, + }; + let inserted = append_logical_message(&tli, &append)?; + begin_lsn = inserted.end_lsn; + } + Ok(()) + }) + .await + .map_err(|e| ApiError::InternalServerError(e.into()))? + .map_err(ApiError::InternalServerError)?; + json_response(StatusCode::OK, ()) +} + /// Deactivates the timeline and removes its data directory. async fn timeline_delete_force_handler( mut request: Request, @@ -302,6 +349,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder .get("/v1/status", status_handler) // Will be used in the future instead of implicit timeline creation .post("/v1/tenant/timeline", timeline_create_handler) + .post("/v1/fake_timeline", create_fake_timeline_handler) .get( "/v1/tenant/:tenant_id/timeline/:timeline_id", timeline_status_handler, diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index f80ab5954b..029502be50 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -31,21 +31,21 @@ use utils::{lsn::Lsn, postgres_backend_async::PostgresBackend}; #[derive(Serialize, Deserialize, Debug)] pub struct AppendLogicalMessage { // prefix and message to build LogicalMessage - lm_prefix: String, - lm_message: String, + pub lm_prefix: String, + pub lm_message: String, // if true, commit_lsn will match flush_lsn after append - set_commit_lsn: bool, + pub set_commit_lsn: bool, // if true, ProposerElected will be sent before append - send_proposer_elected: bool, + pub send_proposer_elected: bool, // fields from AppendRequestHeader - term: Term, - epoch_start_lsn: Lsn, - begin_lsn: Lsn, - truncate_lsn: Lsn, - pg_version: u32, + pub term: Term, + pub epoch_start_lsn: Lsn, + pub begin_lsn: Lsn, + pub truncate_lsn: Lsn, + pub pg_version: u32, } #[derive(Debug, Serialize, Deserialize)] @@ -129,15 +129,15 @@ fn send_proposer_elected(tli: &Arc, term: Term, lsn: Lsn) -> anyhow::R } #[derive(Debug, Serialize, Deserialize)] -struct InsertedWAL { +pub struct InsertedWAL { begin_lsn: Lsn, - end_lsn: Lsn, + pub end_lsn: Lsn, append_response: AppendResponse, } /// Extend local WAL with new LogicalMessage record. To do that, /// create AppendRequest with new WAL and pass it to safekeeper. -fn append_logical_message( +pub fn append_logical_message( tli: &Arc, msg: &AppendLogicalMessage, ) -> anyhow::Result { diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 743c9b5ecf..b7bdbed923 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -138,7 +138,7 @@ impl SafekeeperPostgresHandler { send_buf: RefCell::new([0; MAX_SEND_SIZE]), }); - let c = ReplicationContext { + let mut c = ReplicationContext { tli, replica_id, appname, @@ -151,7 +151,10 @@ impl SafekeeperPostgresHandler { }; let _phantom_wf = c.wait_wal_fut(); + let real_end_pos = c.end_pos; + c.end_pos = c.start_pos + 1; // to well form read_wal future let _phantom_rf = c.read_wal_fut(); + c.end_pos = real_end_pos; ReplicationHandler { c, @@ -163,8 +166,8 @@ impl SafekeeperPostgresHandler { } } -/// START_REPLICATION stream driver: sends WAL and receives feedback. pin_project! { + /// START_REPLICATION stream driver: sends WAL and receives feedback. struct ReplicationHandler<'a, WF, RF> where WF: Future>>, @@ -222,8 +225,6 @@ pin_project! { WF: Future>>, RF: Future>, { - // TODO: see if we can remove boxing here; with anon type of async fn this - // is untrivial (+ needs fiddling with pinning, pin_project and replace). WaitWal{ #[pin] fut: WF}, ReadWal{ #[pin] fut: RF}, FlushWal, @@ -294,10 +295,7 @@ where .context("failed to deserialize StandbyReply")?; // This must be a regular postgres replica, // because pageserver doesn't send this type of messages to safekeeper. - // Currently this is not implemented, so this message is ignored. - - warn!("unexpected StandbyReply. Read-only postgres replicas are not supported in safekeepers yet."); - // timeline.update_replica_state(replica_id, Some(state)); + // Currently we just ignore this, tracking progress for them is not supported. } Some(NEON_STATUS_UPDATE_TAG_BYTE) => { // Note: deserializing is on m[9..] because we skip the tag byte and len bytes. @@ -379,17 +377,19 @@ where let mut this = self.as_mut().project(); let write_ctx_clone = this.c.write_ctx.clone(); let send_buf = &write_ctx_clone.send_buf.borrow()[..read_len]; - let (start_pos, end_pos) = (this.c.start_pos.0, this.c.end_pos.0); + let chunk_end = this.c.start_pos + read_len as u64; // write data to the output buffer this.c .pgb .write_message(&BeMessage::XLogData(XLogDataBody { - wal_start: start_pos, - wal_end: end_pos, + wal_start: this.c.start_pos.0, + wal_end: chunk_end.0, timestamp: get_current_timestamp(), data: send_buf, })) .context("Failed to write XLogData")?; + trace!("wrote a chunk of wal {}-{}", this.c.start_pos, chunk_end); + this.c.start_pos = chunk_end; // and flush it this.write_state.set(WriteState::FlushWal); } @@ -427,11 +427,12 @@ where let fut = self.c.wait_wal_fut(); self.project().write_state.set(WriteState::WaitWal { fut: { - // SAFETY: this function is the only way to assign futures to + // SAFETY: this function is the only way to assign WaitWal to // write_state. We just workaround impossibility of specifying // async fn type, which is anonymous. // transmute_copy is used as transmute refuses generic param: // https://users.rust-lang.org/t/transmute-doesnt-work-on-generic-types/87272 + assert_eq!(std::mem::size_of::(), std::mem::size_of_val(&fut)); let t = unsafe { std::mem::transmute_copy(&fut) }; std::mem::forget(fut); t @@ -444,11 +445,12 @@ where let fut = self.c.read_wal_fut(); self.project().write_state.set(WriteState::ReadWal { fut: { - // SAFETY: this function is the only way to assign futures to + // SAFETY: this function is the only way to assign ReadWal to // write_state. We just workaround impossibility of specifying // async fn type, which is anonymous. // transmute_copy is used as transmute refuses generic param: // https://users.rust-lang.org/t/transmute-doesnt-work-on-generic-types/87272 + assert_eq!(std::mem::size_of::(), std::mem::size_of_val(&fut)); let t = unsafe { std::mem::transmute_copy(&fut) }; std::mem::forget(fut); t @@ -467,7 +469,11 @@ impl ReplicationContext<'_> { // Create future reading WAL. fn read_wal_fut(&self) -> impl Future> { - let mut send_size = self.end_pos.checked_sub(self.start_pos).unwrap().0 as usize; + let mut send_size = self + .end_pos + .checked_sub(self.start_pos) + .expect("reading wal without waiting for it first") + .0 as usize; send_size = min(send_size, self.write_ctx.send_buf.borrow().len()); let write_ctx_fut = self.write_ctx.clone(); async move {