Compare commits

...

3 Commits

Author SHA1 Message Date
Arthur Petukhovsky
165a1b7914 Merge branch 'control-persist-only-if-sync' into safekeeper-append-perf 2021-12-06 18:48:47 +03:00
Arthur Petukhovsky
8f85d69b6a Don't persist control file if sync is false 2021-12-06 18:48:11 +03:00
Arthur Petukhovsky
bc4b89013c Add append_test for measuring performance 2021-12-06 18:00:29 +03:00
5 changed files with 149 additions and 44 deletions

3
.gitignore vendored
View File

@@ -11,3 +11,6 @@ test_output/
# Coverage # Coverage
*.profraw *.profraw
*.profdata *.profdata
tmp1
tmp2

View File

@@ -0,0 +1,91 @@
//
// Performance test for append handling in safekeeper.
//
use anyhow::Result;
use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use log::*;
use std::path::{Path, PathBuf};
use std::thread;
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::http;
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::SafeKeeperConf;
use walkeeper::timeline::{GlobalTimelines, Timeline};
use walkeeper::timeline::CreateControlFile;
use walkeeper::json_ctrl;
use zenith_utils::http::endpoint;
use zenith_utils::shutdown::exit_now;
use zenith_utils::signals;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
use zenith_utils::zid::{self, ZTimelineId, ZTenantId};
use std::sync::Arc;
use walkeeper::safekeeper::{AcceptorProposerMessage, AppendResponse};
use walkeeper::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
};
use walkeeper::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use walkeeper::send_wal::SendWalHandler;
use walkeeper::timeline::TimelineTools;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils;
use postgres_ffi::{uint32, uint64, Oid, XLogRecord};
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper");
let log_file = logging::init("safekeeper.log", false)?;
info!("version: {}", GIT_VERSION);
let timeline_id = ZTimelineId::generate();
let tenant_id = ZTenantId::generate();
let create = CreateControlFile::True;
let conf = SafeKeeperConf::default();
let mut timeline = GlobalTimelines::get(&conf, tenant_id, timeline_id, create)?;
json_ctrl::prepare_safekeeper(tenant_id ,timeline_id, &mut timeline)?;
let term = 1;
let epoch_start_lsn = Lsn::from(0x16B9188);
let mut lsn = epoch_start_lsn;
json_ctrl::send_proposer_elected(&timeline, term, lsn)?;
let message = "a".repeat(1024 * 8);
info!("Starting test");
let now = std::time::Instant::now();
let test_duration = std::time::Duration::from_secs(10);
let timeout = now.checked_add(test_duration).unwrap();
let mut total_count = 0;
while std::time::Instant::now() < timeout {
let result = json_ctrl::append_logical_message(&timeline, json_ctrl::AppendLogicalMessage{
lm_prefix: "".to_string(),
lm_message: message.clone(),
set_commit_lsn: true,
send_proposer_elected: false,
term,
epoch_start_lsn,
begin_lsn: lsn,
truncate_lsn: lsn,
})?;
lsn = result.end_lsn;
total_count += 1;
}
info!("Total count: {}", total_count);
info!("Test duration: {}s", test_duration.as_secs_f64());
info!("Appends per second: {}", total_count as f64 / test_duration.as_secs_f64());
Ok(())
}

View File

@@ -11,6 +11,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use crc32c::crc32c_append; use crc32c::crc32c_append;
use log::*; use log::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse}; use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{ use crate::safekeeper::{
@@ -18,31 +19,32 @@ use crate::safekeeper::{
}; };
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry}; use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::send_wal::SendWalHandler; use crate::send_wal::SendWalHandler;
use crate::timeline::TimelineTools; use crate::timeline::{TimelineTools, Timeline};
use postgres_ffi::pg_constants; use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils; use postgres_ffi::xlog_utils;
use postgres_ffi::{uint32, uint64, Oid, XLogRecord}; use postgres_ffi::{uint32, uint64, Oid, XLogRecord};
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID}; use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
use zenith_utils::zid::{ZTimelineId, ZTenantId};
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
struct AppendLogicalMessage { pub struct AppendLogicalMessage {
// prefix and message to build LogicalMessage // prefix and message to build LogicalMessage
lm_prefix: String, pub lm_prefix: String,
lm_message: String, pub lm_message: String,
// if true, commit_lsn will match flush_lsn after append // if true, commit_lsn will match flush_lsn after append
set_commit_lsn: bool, pub set_commit_lsn: bool,
// if true, ProposerElected will be sent before append // if true, ProposerElected will be sent before append
send_proposer_elected: bool, pub send_proposer_elected: bool,
// fields from AppendRequestHeader // fields from AppendRequestHeader
term: Term, pub term: Term,
epoch_start_lsn: Lsn, pub epoch_start_lsn: Lsn,
begin_lsn: Lsn, pub begin_lsn: Lsn,
truncate_lsn: Lsn, pub truncate_lsn: Lsn,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@@ -71,14 +73,14 @@ pub fn handle_json_ctrl(
info!("JSON_CTRL request: {:?}", append_request); info!("JSON_CTRL request: {:?}", append_request);
// need to init safekeeper state before AppendRequest // need to init safekeeper state before AppendRequest
prepare_safekeeper(swh)?; prepare_safekeeper(swh.tenantid.unwrap(), swh.timelineid.unwrap(), swh.timeline.get())?;
// if send_proposer_elected is true, we need to update local history // if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected { if append_request.send_proposer_elected {
send_proposer_elected(swh, append_request.term, append_request.epoch_start_lsn)?; send_proposer_elected(swh.timeline.get(), append_request.term, append_request.epoch_start_lsn)?;
} }
let inserted_wal = append_logical_message(swh, append_request)?; let inserted_wal = append_logical_message(swh.timeline.get(), append_request)?;
let response = AppendResult { let response = AppendResult {
state: swh.timeline.get().get_info(), state: swh.timeline.get().get_info(),
inserted_wal, inserted_wal,
@@ -98,28 +100,28 @@ pub fn handle_json_ctrl(
/// Prepare safekeeper to process append requests without crashes, /// Prepare safekeeper to process append requests without crashes,
/// by sending ProposerGreeting with default server.wal_seg_size. /// by sending ProposerGreeting with default server.wal_seg_size.
fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> { pub fn prepare_safekeeper(tenant_id: ZTenantId, timeline_id: ZTimelineId, timeline: &Arc<Timeline>) -> Result<()> {
let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting { let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting {
protocol_version: 1, // current protocol protocol_version: 1, // current protocol
pg_version: 0, // unknown pg_version: 0, // unknown
proposer_id: [0u8; 16], proposer_id: [0u8; 16],
system_id: 0, system_id: 0,
ztli: swh.timelineid.unwrap(), ztli: timeline_id,
tenant_id: swh.tenantid.unwrap(), tenant_id: tenant_id,
tli: 0, tli: xlog_utils::PG_TLI,
wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests
}); });
let response = swh.timeline.get().process_msg(&greeting_request)?; let response = timeline.process_msg(&greeting_request)?;
match response { match response {
Some(AcceptorProposerMessage::Greeting(_)) => Ok(()), Some(AcceptorProposerMessage::Greeting(_)) => Ok(()),
_ => anyhow::bail!("not GreetingResponse"), _ => anyhow::bail!("not GreetingResponse"),
} }
} }
fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> { pub fn send_proposer_elected(timeline: &Arc<Timeline>, term: Term, lsn: Lsn) -> Result<()> {
// add new term to existing history // add new term to existing history
let history = swh.timeline.get().get_info().acceptor_state.term_history; let history = timeline.get_info().acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap()); let history = history.up_to(lsn.checked_sub(1u64).unwrap());
let mut history_entries = history.0; let mut history_entries = history.0;
history_entries.push(TermSwitchEntry { term, lsn }); history_entries.push(TermSwitchEntry { term, lsn });
@@ -131,25 +133,25 @@ fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Resu
term_history: history, term_history: history,
}); });
swh.timeline.get().process_msg(&proposer_elected_request)?; timeline.process_msg(&proposer_elected_request)?;
Ok(()) Ok(())
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct InsertedWAL { pub struct InsertedWAL {
begin_lsn: Lsn, pub begin_lsn: Lsn,
end_lsn: Lsn, pub end_lsn: Lsn,
append_response: AppendResponse, pub append_response: AppendResponse,
} }
/// Extend local WAL with new LogicalMessage record. To do that, /// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper. /// create AppendRequest with new WAL and pass it to safekeeper.
fn append_logical_message( pub fn append_logical_message(
swh: &mut SendWalHandler, timeline: &Arc<Timeline>,
msg: AppendLogicalMessage, msg: AppendLogicalMessage,
) -> Result<InsertedWAL> { ) -> Result<InsertedWAL> {
let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message); let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message);
let sk_state = swh.timeline.get().get_info(); let sk_state = timeline.get_info();
let begin_lsn = msg.begin_lsn; let begin_lsn = msg.begin_lsn;
let end_lsn = begin_lsn + wal_data.len() as u64; let end_lsn = begin_lsn + wal_data.len() as u64;
@@ -173,7 +175,7 @@ fn append_logical_message(
wal_data: Bytes::from(wal_data), wal_data: Bytes::from(wal_data),
}); });
let response = swh.timeline.get().process_msg(&append_request)?; let response = timeline.process_msg(&append_request)?;
let append_response = match response { let append_response = match response {
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
@@ -222,11 +224,15 @@ fn encode_logical_message(prefix: String, message: String) -> Vec<u8> {
let mainrdata = logical_message.encode(); let mainrdata = logical_message.encode();
let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len(); let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
// only short mainrdata is supported for now
assert!(mainrdata_len <= 255);
let mainrdata_len = mainrdata_len as u8;
let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len]; let mut data: Vec<u8> = vec![];
if mainrdata_len <= 255 {
data.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT);
data.put_u8(mainrdata_len as u8);
} else {
data.put_u8(pg_constants::XLR_BLOCK_ID_DATA_LONG);
data.put_u64_le(mainrdata_len as u64);
}
data.extend_from_slice(&mainrdata); data.extend_from_slice(&mainrdata);
data.extend_from_slice(&prefix_bytes); data.extend_from_slice(&prefix_bytes);
data.extend_from_slice(message_bytes); data.extend_from_slice(message_bytes);

View File

@@ -702,15 +702,17 @@ where
); );
self.decoder = WalStreamDecoder::new(msg.h.begin_lsn); self.decoder = WalStreamDecoder::new(msg.h.begin_lsn);
} }
self.decoder.feed_bytes(&msg.wal_data); // XXX: decoder is disabled for this test
loop { // self.decoder.feed_bytes(&msg.wal_data);
match self.decoder.poll_decode()? { // loop {
None => break, // no full record yet // match self.decoder.poll_decode()? {
Some((lsn, _rec)) => { // None => break, // no full record yet
last_rec_lsn = lsn; // Some((lsn, _rec)) => {
} // last_rec_lsn = lsn;
} // }
} // }
// }
last_rec_lsn = msg.h.begin_lsn + msg.wal_data.len() as u64;
// If this was the first record we ever receieved, remember LSN to help // If this was the first record we ever receieved, remember LSN to help
// find_end_of_wal skip the hole in the beginning. // find_end_of_wal skip the hole in the beginning.
@@ -753,7 +755,10 @@ where
self.s.commit_lsn = self.commit_lsn; self.s.commit_lsn = self.commit_lsn;
self.s.truncate_lsn = self.truncate_lsn; self.s.truncate_lsn = self.truncate_lsn;
} }
self.storage.persist(&self.s, sync_control_file)?;
if sync_control_file {
self.storage.persist(&self.s, true)?;
}
let resp = self.append_response(); let resp = self.append_response();
info!( info!(