Add append_test for measuring performance

This commit is contained in:
Arthur Petukhovsky
2021-12-06 17:13:35 +03:00
parent 0a8c672630
commit bc4b89013c
5 changed files with 145 additions and 43 deletions

3
.gitignore vendored
View File

@@ -11,3 +11,6 @@ test_output/
# Coverage
*.profraw
*.profdata
tmp1
tmp2

View File

@@ -0,0 +1,91 @@
//
// Performance test for append handling in safekeeper.
//
use anyhow::Result;
use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use log::*;
use std::path::{Path, PathBuf};
use std::thread;
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::http;
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::SafeKeeperConf;
use walkeeper::timeline::{GlobalTimelines, Timeline};
use walkeeper::timeline::CreateControlFile;
use walkeeper::json_ctrl;
use zenith_utils::http::endpoint;
use zenith_utils::shutdown::exit_now;
use zenith_utils::signals;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
use zenith_utils::zid::{self, ZTimelineId, ZTenantId};
use std::sync::Arc;
use walkeeper::safekeeper::{AcceptorProposerMessage, AppendResponse};
use walkeeper::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
};
use walkeeper::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use walkeeper::send_wal::SendWalHandler;
use walkeeper::timeline::TimelineTools;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils;
use postgres_ffi::{uint32, uint64, Oid, XLogRecord};
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper");
let log_file = logging::init("safekeeper.log", false)?;
info!("version: {}", GIT_VERSION);
let timeline_id = ZTimelineId::generate();
let tenant_id = ZTenantId::generate();
let create = CreateControlFile::True;
let conf = SafeKeeperConf::default();
let mut timeline = GlobalTimelines::get(&conf, tenant_id, timeline_id, create)?;
json_ctrl::prepare_safekeeper(tenant_id ,timeline_id, &mut timeline)?;
let term = 1;
let epoch_start_lsn = Lsn::from(0x16B9188);
let mut lsn = epoch_start_lsn;
json_ctrl::send_proposer_elected(&timeline, term, lsn)?;
let message = "a".repeat(1024 * 8);
info!("Starting test");
let now = std::time::Instant::now();
let test_duration = std::time::Duration::from_secs(10);
let timeout = now.checked_add(test_duration).unwrap();
let mut total_count = 0;
while std::time::Instant::now() < timeout {
let result = json_ctrl::append_logical_message(&timeline, json_ctrl::AppendLogicalMessage{
lm_prefix: "".to_string(),
lm_message: message.clone(),
set_commit_lsn: true,
send_proposer_elected: false,
term,
epoch_start_lsn,
begin_lsn: lsn,
truncate_lsn: lsn,
})?;
lsn = result.end_lsn;
total_count += 1;
}
info!("Total count: {}", total_count);
info!("Test duration: {}s", test_duration.as_secs_f64());
info!("Appends per second: {}", total_count as f64 / test_duration.as_secs_f64());
Ok(())
}

View File

@@ -11,6 +11,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use crc32c::crc32c_append;
use log::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{
@@ -18,31 +19,32 @@ use crate::safekeeper::{
};
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::send_wal::SendWalHandler;
use crate::timeline::TimelineTools;
use crate::timeline::{TimelineTools, Timeline};
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils;
use postgres_ffi::{uint32, uint64, Oid, XLogRecord};
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
use zenith_utils::zid::{ZTimelineId, ZTenantId};
#[derive(Serialize, Deserialize, Debug)]
struct AppendLogicalMessage {
pub struct AppendLogicalMessage {
// prefix and message to build LogicalMessage
lm_prefix: String,
lm_message: String,
pub lm_prefix: String,
pub lm_message: String,
// if true, commit_lsn will match flush_lsn after append
set_commit_lsn: bool,
pub set_commit_lsn: bool,
// if true, ProposerElected will be sent before append
send_proposer_elected: bool,
pub send_proposer_elected: bool,
// fields from AppendRequestHeader
term: Term,
epoch_start_lsn: Lsn,
begin_lsn: Lsn,
truncate_lsn: Lsn,
pub term: Term,
pub epoch_start_lsn: Lsn,
pub begin_lsn: Lsn,
pub truncate_lsn: Lsn,
}
#[derive(Serialize, Deserialize)]
@@ -71,14 +73,14 @@ pub fn handle_json_ctrl(
info!("JSON_CTRL request: {:?}", append_request);
// need to init safekeeper state before AppendRequest
prepare_safekeeper(swh)?;
prepare_safekeeper(swh.tenantid.unwrap(), swh.timelineid.unwrap(), swh.timeline.get())?;
// if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected {
send_proposer_elected(swh, append_request.term, append_request.epoch_start_lsn)?;
send_proposer_elected(swh.timeline.get(), append_request.term, append_request.epoch_start_lsn)?;
}
let inserted_wal = append_logical_message(swh, append_request)?;
let inserted_wal = append_logical_message(swh.timeline.get(), append_request)?;
let response = AppendResult {
state: swh.timeline.get().get_info(),
inserted_wal,
@@ -98,28 +100,28 @@ pub fn handle_json_ctrl(
/// Prepare safekeeper to process append requests without crashes,
/// by sending ProposerGreeting with default server.wal_seg_size.
fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> {
pub fn prepare_safekeeper(tenant_id: ZTenantId, timeline_id: ZTimelineId, timeline: &Arc<Timeline>) -> Result<()> {
let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting {
protocol_version: 1, // current protocol
pg_version: 0, // unknown
proposer_id: [0u8; 16],
system_id: 0,
ztli: swh.timelineid.unwrap(),
tenant_id: swh.tenantid.unwrap(),
tli: 0,
ztli: timeline_id,
tenant_id: tenant_id,
tli: xlog_utils::PG_TLI,
wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests
});
let response = swh.timeline.get().process_msg(&greeting_request)?;
let response = timeline.process_msg(&greeting_request)?;
match response {
Some(AcceptorProposerMessage::Greeting(_)) => Ok(()),
_ => anyhow::bail!("not GreetingResponse"),
}
}
fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> {
pub fn send_proposer_elected(timeline: &Arc<Timeline>, term: Term, lsn: Lsn) -> Result<()> {
// add new term to existing history
let history = swh.timeline.get().get_info().acceptor_state.term_history;
let history = timeline.get_info().acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
let mut history_entries = history.0;
history_entries.push(TermSwitchEntry { term, lsn });
@@ -131,25 +133,25 @@ fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Resu
term_history: history,
});
swh.timeline.get().process_msg(&proposer_elected_request)?;
timeline.process_msg(&proposer_elected_request)?;
Ok(())
}
#[derive(Serialize, Deserialize)]
struct InsertedWAL {
begin_lsn: Lsn,
end_lsn: Lsn,
append_response: AppendResponse,
pub struct InsertedWAL {
pub begin_lsn: Lsn,
pub end_lsn: Lsn,
pub append_response: AppendResponse,
}
/// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper.
fn append_logical_message(
swh: &mut SendWalHandler,
pub fn append_logical_message(
timeline: &Arc<Timeline>,
msg: AppendLogicalMessage,
) -> Result<InsertedWAL> {
let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message);
let sk_state = swh.timeline.get().get_info();
let sk_state = timeline.get_info();
let begin_lsn = msg.begin_lsn;
let end_lsn = begin_lsn + wal_data.len() as u64;
@@ -173,7 +175,7 @@ fn append_logical_message(
wal_data: Bytes::from(wal_data),
});
let response = swh.timeline.get().process_msg(&append_request)?;
let response = timeline.process_msg(&append_request)?;
let append_response = match response {
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
@@ -222,11 +224,15 @@ fn encode_logical_message(prefix: String, message: String) -> Vec<u8> {
let mainrdata = logical_message.encode();
let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
// only short mainrdata is supported for now
assert!(mainrdata_len <= 255);
let mainrdata_len = mainrdata_len as u8;
let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
let mut data: Vec<u8> = vec![];
if mainrdata_len <= 255 {
data.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT);
data.put_u8(mainrdata_len as u8);
} else {
data.put_u8(pg_constants::XLR_BLOCK_ID_DATA_LONG);
data.put_u64_le(mainrdata_len as u64);
}
data.extend_from_slice(&mainrdata);
data.extend_from_slice(&prefix_bytes);
data.extend_from_slice(message_bytes);

View File

@@ -702,15 +702,17 @@ where
);
self.decoder = WalStreamDecoder::new(msg.h.begin_lsn);
}
self.decoder.feed_bytes(&msg.wal_data);
loop {
match self.decoder.poll_decode()? {
None => break, // no full record yet
Some((lsn, _rec)) => {
last_rec_lsn = lsn;
}
}
}
// XXX: decoder is disabled for this test
// self.decoder.feed_bytes(&msg.wal_data);
// loop {
// match self.decoder.poll_decode()? {
// None => break, // no full record yet
// Some((lsn, _rec)) => {
// last_rec_lsn = lsn;
// }
// }
// }
last_rec_lsn = msg.h.begin_lsn + msg.wal_data.len() as u64;
// If this was the first record we ever receieved, remember LSN to help
// find_end_of_wal skip the hole in the beginning.