Test & bug fix it.

Add fake_timeline endoint creating timeline + some WAL.

curl -X POST http://127.0.0.1:7676/v1/fake_timeline
Set in pg_receivewal.c:
  stream.startpos = 0x1493AC8;
pg_install/v15/bin/pg_receivewal -v -d "host=localhost port=5454 options='-c tenant_id=deadbeefdeadbeefdeadbeefdeadbeef timeline_id=deadbeefdeadbeefdeadbeefdeadbeef'" -D ~/tmp/tmp/tmp
This commit is contained in:
Arseny Sher
2023-02-03 12:16:32 +04:00
parent 2bbd24edbf
commit 48fb085ebd
4 changed files with 129 additions and 32 deletions

View File

@@ -1,7 +1,7 @@
//! Part of Safekeeper pretending to be Postgres, i.e. handling Postgres
//! protocol commands.
use anyhow::Context;
use anyhow::{bail, Context};
use std::str;
use tracing::{info, info_span, Instrument};
@@ -37,9 +37,11 @@ enum SafekeeperPostgresCommand {
StartReplication { start_lsn: Lsn },
IdentifySystem,
JSONCtrl { cmd: AppendLogicalMessage },
Show { guc: String },
}
fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
let cmd_lowercase = cmd.to_ascii_lowercase();
if cmd.starts_with("START_WAL_PUSH") {
Ok(SafekeeperPostgresCommand::StartWalPush)
} else if cmd.starts_with("START_REPLICATION") {
@@ -49,7 +51,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
let start_lsn = caps
.next()
.map(|cap| cap[1].parse::<Lsn>())
.context("failed to parse start LSN from START_REPLICATION command")??;
.context("parse start LSN from START_REPLICATION command")??;
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn })
} else if cmd.starts_with("IDENTIFY_SYSTEM") {
Ok(SafekeeperPostgresCommand::IdentifySystem)
@@ -58,6 +60,14 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
Ok(SafekeeperPostgresCommand::JSONCtrl {
cmd: serde_json::from_str(cmd)?,
})
} else if cmd_lowercase.starts_with("show") {
let re = Regex::new(r"show ((?:[[:alpha:]]|_)+)").unwrap();
let mut caps = re.captures_iter(&cmd_lowercase);
let guc = caps
.next()
.map(|cap| cap[1].parse::<String>())
.context("parse guc in SHOW command")??;
Ok(SafekeeperPostgresCommand::Show { guc })
} else {
anyhow::bail!("unsupported command {cmd}");
}
@@ -148,10 +158,7 @@ impl postgres_backend_async::Handler for SafekeeperPostgresHandler {
.await?;
return Ok(());
}
info!(
"got unparsed query {:?} in timeline {:?}",
query_string, self.timeline_id
);
let cmd = parse_cmd(query_string)?;
info!(
@@ -174,9 +181,11 @@ impl postgres_backend_async::Handler for SafekeeperPostgresHandler {
.await
}
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
SafekeeperPostgresCommand::Show { guc } => self.handle_show(guc, pgb).await,
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd).await
}
_ => unreachable!(),
};
match res {
@@ -283,6 +292,40 @@ impl SafekeeperPostgresHandler {
Ok(())
}
async fn handle_show(
&mut self,
guc: String,
pgb: &mut PostgresBackend,
) -> Result<(), QueryError> {
match guc.as_str() {
// pg_receivewal wants it
"data_directory_mode" => {
pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::int8_col(
b"data_directory_mode",
)]))?
// xxx we could return real one, not just 0700
.write_message(&BeMessage::DataRow(&[Some(0700.to_string().as_bytes())]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
// pg_receivewal wants it
"wal_segment_size" => {
let tli = GlobalTimelines::get(self.ttid)?;
let wal_seg_size = tli.get_state().1.server.wal_seg_size;
let wal_seg_size_mb = (wal_seg_size / 1024 / 1024).to_string() + "MB";
pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"wal_segment_size",
)]))?
.write_message(&BeMessage::DataRow(&[Some(wal_seg_size_mb.as_bytes())]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
_ => {
return Err(anyhow::anyhow!("SHOW of unknown setting").into());
}
}
Ok(())
}
/// Returns true if current connection is a replication connection, originating
/// from a walproposer recovery function. This connection gets a special handling:
/// safekeeper must stream all local WAL till the flush_lsn, whether committed or not.

View File

@@ -8,11 +8,14 @@ use serde::Serialize;
use serde::Serializer;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::str::FromStr;
use std::sync::Arc;
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::task::JoinError;
use crate::json_ctrl::append_logical_message;
use crate::json_ctrl::AppendLogicalMessage;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::Term;
@@ -191,6 +194,50 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
json_response(StatusCode::OK, ())
}
// Create fake timeline + insert some valid WAL. Useful to test WAL streaming
// from safekeeper in isolation, e.g.
// pg_receivewal -v -d "host=localhost port=5454 options='-c tenant_id=deadbeefdeadbeefdeadbeefdeadbeef timeline_id=deadbeefdeadbeefdeadbeefdeadbeef'" -D ~/tmp/tmp/tmp
// (hacking pg_receivewal startpos is currently needed though to make pg_receivewal work)
async fn create_fake_timeline_handler(_request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId {
tenant_id: TenantId::from_str("deadbeefdeadbeefdeadbeefdeadbeef")
.expect("timeline_id parsing failed"),
timeline_id: TimelineId::from_str("deadbeefdeadbeefdeadbeefdeadbeef")
.expect("tenant_id parsing failed"),
};
let pg_version = 150000;
let server_info = ServerInfo {
pg_version,
system_id: 0,
wal_seg_size: WAL_SEGMENT_SIZE as u32,
};
let init_lsn = Lsn(0x1493AC8);
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
let tli = GlobalTimelines::create(ttid, server_info, init_lsn, init_lsn)?;
let mut begin_lsn = init_lsn;
for _ in 0..16 {
let append = AppendLogicalMessage {
lm_prefix: "db".to_owned(),
lm_message: "hahabubu".to_owned(),
set_commit_lsn: true,
send_proposer_elected: false, // actually ignored here
term: 0,
epoch_start_lsn: init_lsn,
begin_lsn,
truncate_lsn: init_lsn,
pg_version,
};
let inserted = append_logical_message(&tli, &append)?;
begin_lsn = inserted.end_lsn;
}
Ok(())
})
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
/// Deactivates the timeline and removes its data directory.
async fn timeline_delete_force_handler(
mut request: Request<Body>,
@@ -302,6 +349,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
.get("/v1/status", status_handler)
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/timeline", timeline_create_handler)
.post("/v1/fake_timeline", create_fake_timeline_handler)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_status_handler,

View File

@@ -31,21 +31,21 @@ use utils::{lsn::Lsn, postgres_backend_async::PostgresBackend};
#[derive(Serialize, Deserialize, Debug)]
pub struct AppendLogicalMessage {
// prefix and message to build LogicalMessage
lm_prefix: String,
lm_message: String,
pub lm_prefix: String,
pub lm_message: String,
// if true, commit_lsn will match flush_lsn after append
set_commit_lsn: bool,
pub set_commit_lsn: bool,
// if true, ProposerElected will be sent before append
send_proposer_elected: bool,
pub send_proposer_elected: bool,
// fields from AppendRequestHeader
term: Term,
epoch_start_lsn: Lsn,
begin_lsn: Lsn,
truncate_lsn: Lsn,
pg_version: u32,
pub term: Term,
pub epoch_start_lsn: Lsn,
pub begin_lsn: Lsn,
pub truncate_lsn: Lsn,
pub pg_version: u32,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -129,15 +129,15 @@ fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::R
}
#[derive(Debug, Serialize, Deserialize)]
struct InsertedWAL {
pub struct InsertedWAL {
begin_lsn: Lsn,
end_lsn: Lsn,
pub end_lsn: Lsn,
append_response: AppendResponse,
}
/// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper.
fn append_logical_message(
pub fn append_logical_message(
tli: &Arc<Timeline>,
msg: &AppendLogicalMessage,
) -> anyhow::Result<InsertedWAL> {

View File

@@ -138,7 +138,7 @@ impl SafekeeperPostgresHandler {
send_buf: RefCell::new([0; MAX_SEND_SIZE]),
});
let c = ReplicationContext {
let mut c = ReplicationContext {
tli,
replica_id,
appname,
@@ -151,7 +151,10 @@ impl SafekeeperPostgresHandler {
};
let _phantom_wf = c.wait_wal_fut();
let real_end_pos = c.end_pos;
c.end_pos = c.start_pos + 1; // to well form read_wal future
let _phantom_rf = c.read_wal_fut();
c.end_pos = real_end_pos;
ReplicationHandler {
c,
@@ -163,8 +166,8 @@ impl SafekeeperPostgresHandler {
}
}
/// START_REPLICATION stream driver: sends WAL and receives feedback.
pin_project! {
/// START_REPLICATION stream driver: sends WAL and receives feedback.
struct ReplicationHandler<'a, WF, RF>
where
WF: Future<Output = anyhow::Result<Option<Lsn>>>,
@@ -222,8 +225,6 @@ pin_project! {
WF: Future<Output = anyhow::Result<Option<Lsn>>>,
RF: Future<Output = anyhow::Result<usize>>,
{
// TODO: see if we can remove boxing here; with anon type of async fn this
// is untrivial (+ needs fiddling with pinning, pin_project and replace).
WaitWal{ #[pin] fut: WF},
ReadWal{ #[pin] fut: RF},
FlushWal,
@@ -294,10 +295,7 @@ where
.context("failed to deserialize StandbyReply")?;
// This must be a regular postgres replica,
// because pageserver doesn't send this type of messages to safekeeper.
// Currently this is not implemented, so this message is ignored.
warn!("unexpected StandbyReply. Read-only postgres replicas are not supported in safekeepers yet.");
// timeline.update_replica_state(replica_id, Some(state));
// Currently we just ignore this, tracking progress for them is not supported.
}
Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
// Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
@@ -379,17 +377,19 @@ where
let mut this = self.as_mut().project();
let write_ctx_clone = this.c.write_ctx.clone();
let send_buf = &write_ctx_clone.send_buf.borrow()[..read_len];
let (start_pos, end_pos) = (this.c.start_pos.0, this.c.end_pos.0);
let chunk_end = this.c.start_pos + read_len as u64;
// write data to the output buffer
this.c
.pgb
.write_message(&BeMessage::XLogData(XLogDataBody {
wal_start: start_pos,
wal_end: end_pos,
wal_start: this.c.start_pos.0,
wal_end: chunk_end.0,
timestamp: get_current_timestamp(),
data: send_buf,
}))
.context("Failed to write XLogData")?;
trace!("wrote a chunk of wal {}-{}", this.c.start_pos, chunk_end);
this.c.start_pos = chunk_end;
// and flush it
this.write_state.set(WriteState::FlushWal);
}
@@ -427,11 +427,12 @@ where
let fut = self.c.wait_wal_fut();
self.project().write_state.set(WriteState::WaitWal {
fut: {
// SAFETY: this function is the only way to assign futures to
// SAFETY: this function is the only way to assign WaitWal to
// write_state. We just workaround impossibility of specifying
// async fn type, which is anonymous.
// transmute_copy is used as transmute refuses generic param:
// https://users.rust-lang.org/t/transmute-doesnt-work-on-generic-types/87272
assert_eq!(std::mem::size_of::<WF>(), std::mem::size_of_val(&fut));
let t = unsafe { std::mem::transmute_copy(&fut) };
std::mem::forget(fut);
t
@@ -444,11 +445,12 @@ where
let fut = self.c.read_wal_fut();
self.project().write_state.set(WriteState::ReadWal {
fut: {
// SAFETY: this function is the only way to assign futures to
// SAFETY: this function is the only way to assign ReadWal to
// write_state. We just workaround impossibility of specifying
// async fn type, which is anonymous.
// transmute_copy is used as transmute refuses generic param:
// https://users.rust-lang.org/t/transmute-doesnt-work-on-generic-types/87272
assert_eq!(std::mem::size_of::<RF>(), std::mem::size_of_val(&fut));
let t = unsafe { std::mem::transmute_copy(&fut) };
std::mem::forget(fut);
t
@@ -467,7 +469,11 @@ impl ReplicationContext<'_> {
// Create future reading WAL.
fn read_wal_fut(&self) -> impl Future<Output = anyhow::Result<usize>> {
let mut send_size = self.end_pos.checked_sub(self.start_pos).unwrap().0 as usize;
let mut send_size = self
.end_pos
.checked_sub(self.start_pos)
.expect("reading wal without waiting for it first")
.0 as usize;
send_size = min(send_size, self.write_ctx.send_buf.borrow().len());
let write_ctx_fut = self.write_ctx.clone();
async move {