From bc4b89013cfdac86cd17c43c459ccfcb8a54ba3a Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Mon, 6 Dec 2021 17:13:35 +0300 Subject: [PATCH] Add append_test for measuring performance --- .gitignore | 3 ++ vendor/postgres | 2 +- walkeeper/src/bin/append_test.rs | 91 ++++++++++++++++++++++++++++++++ walkeeper/src/json_ctrl.rs | 72 +++++++++++++------------ walkeeper/src/safekeeper.rs | 20 +++---- 5 files changed, 145 insertions(+), 43 deletions(-) create mode 100644 walkeeper/src/bin/append_test.rs diff --git a/.gitignore b/.gitignore index 2ecdaa2053..055d0b7752 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ test_output/ # Coverage *.profraw *.profdata + +tmp1 +tmp2 \ No newline at end of file diff --git a/vendor/postgres b/vendor/postgres index be8bdba074..b1b1138d26 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit be8bdba074baf2a4c7f8fb2cc701c2b3fac9342f +Subproject commit b1b1138d26330a31af75a2bc92983047d5e313db diff --git a/walkeeper/src/bin/append_test.rs b/walkeeper/src/bin/append_test.rs new file mode 100644 index 0000000000..e55ac1c6a6 --- /dev/null +++ b/walkeeper/src/bin/append_test.rs @@ -0,0 +1,91 @@ +// +// Performance test for append handling in safekeeper. +// +use anyhow::Result; +use clap::{App, Arg}; +use const_format::formatcp; +use daemonize::Daemonize; +use log::*; +use std::path::{Path, PathBuf}; +use std::thread; +use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; +use walkeeper::http; +use walkeeper::s3_offload; +use walkeeper::wal_service; +use walkeeper::SafeKeeperConf; +use walkeeper::timeline::{GlobalTimelines, Timeline}; +use walkeeper::timeline::CreateControlFile; +use walkeeper::json_ctrl; +use zenith_utils::http::endpoint; +use zenith_utils::shutdown::exit_now; +use zenith_utils::signals; +use zenith_utils::{logging, tcp_listener, GIT_VERSION}; +use zenith_utils::zid::{self, ZTimelineId, ZTenantId}; +use std::sync::Arc; + +use walkeeper::safekeeper::{AcceptorProposerMessage, AppendResponse}; +use walkeeper::safekeeper::{ + AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting, +}; +use walkeeper::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry}; +use walkeeper::send_wal::SendWalHandler; +use walkeeper::timeline::TimelineTools; +use postgres_ffi::pg_constants; +use postgres_ffi::xlog_utils; +use postgres_ffi::{uint32, uint64, Oid, XLogRecord}; +use zenith_utils::lsn::Lsn; +use zenith_utils::postgres_backend::PostgresBackend; +use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID}; + +fn main() -> Result<()> { + zenith_metrics::set_common_metrics_prefix("safekeeper"); + let log_file = logging::init("safekeeper.log", false)?; + info!("version: {}", GIT_VERSION); + + let timeline_id = ZTimelineId::generate(); + let tenant_id = ZTenantId::generate(); + let create = CreateControlFile::True; + let conf = SafeKeeperConf::default(); + + let mut timeline = GlobalTimelines::get(&conf, tenant_id, timeline_id, create)?; + + json_ctrl::prepare_safekeeper(tenant_id ,timeline_id, &mut timeline)?; + + let term = 1; + let epoch_start_lsn = Lsn::from(0x16B9188); + let mut lsn = epoch_start_lsn; + + json_ctrl::send_proposer_elected(&timeline, term, lsn)?; + + let message = "a".repeat(1024 * 8); + + info!("Starting test"); + + let now = std::time::Instant::now(); + let test_duration = std::time::Duration::from_secs(10); + let timeout = now.checked_add(test_duration).unwrap(); + + let mut total_count = 0; + + while std::time::Instant::now() < timeout { + let result = json_ctrl::append_logical_message(&timeline, json_ctrl::AppendLogicalMessage{ + lm_prefix: "".to_string(), + lm_message: message.clone(), + set_commit_lsn: true, + send_proposer_elected: false, + term, + epoch_start_lsn, + begin_lsn: lsn, + truncate_lsn: lsn, + })?; + + lsn = result.end_lsn; + total_count += 1; + } + + info!("Total count: {}", total_count); + info!("Test duration: {}s", test_duration.as_secs_f64()); + info!("Appends per second: {}", total_count as f64 / test_duration.as_secs_f64()); + + Ok(()) +} diff --git a/walkeeper/src/json_ctrl.rs b/walkeeper/src/json_ctrl.rs index 2279576722..2aab1c3727 100644 --- a/walkeeper/src/json_ctrl.rs +++ b/walkeeper/src/json_ctrl.rs @@ -11,6 +11,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use crc32c::crc32c_append; use log::*; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use crate::safekeeper::{AcceptorProposerMessage, AppendResponse}; use crate::safekeeper::{ @@ -18,31 +19,32 @@ use crate::safekeeper::{ }; use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry}; use crate::send_wal::SendWalHandler; -use crate::timeline::TimelineTools; +use crate::timeline::{TimelineTools, Timeline}; use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils; use postgres_ffi::{uint32, uint64, Oid, XLogRecord}; use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID}; +use zenith_utils::zid::{ZTimelineId, ZTenantId}; #[derive(Serialize, Deserialize, Debug)] -struct AppendLogicalMessage { +pub struct AppendLogicalMessage { // prefix and message to build LogicalMessage - lm_prefix: String, - lm_message: String, + pub lm_prefix: String, + pub lm_message: String, // if true, commit_lsn will match flush_lsn after append - set_commit_lsn: bool, + pub set_commit_lsn: bool, // if true, ProposerElected will be sent before append - send_proposer_elected: bool, + pub send_proposer_elected: bool, // fields from AppendRequestHeader - term: Term, - epoch_start_lsn: Lsn, - begin_lsn: Lsn, - truncate_lsn: Lsn, + pub term: Term, + pub epoch_start_lsn: Lsn, + pub begin_lsn: Lsn, + pub truncate_lsn: Lsn, } #[derive(Serialize, Deserialize)] @@ -71,14 +73,14 @@ pub fn handle_json_ctrl( info!("JSON_CTRL request: {:?}", append_request); // need to init safekeeper state before AppendRequest - prepare_safekeeper(swh)?; + prepare_safekeeper(swh.tenantid.unwrap(), swh.timelineid.unwrap(), swh.timeline.get())?; // if send_proposer_elected is true, we need to update local history if append_request.send_proposer_elected { - send_proposer_elected(swh, append_request.term, append_request.epoch_start_lsn)?; + send_proposer_elected(swh.timeline.get(), append_request.term, append_request.epoch_start_lsn)?; } - let inserted_wal = append_logical_message(swh, append_request)?; + let inserted_wal = append_logical_message(swh.timeline.get(), append_request)?; let response = AppendResult { state: swh.timeline.get().get_info(), inserted_wal, @@ -98,28 +100,28 @@ pub fn handle_json_ctrl( /// Prepare safekeeper to process append requests without crashes, /// by sending ProposerGreeting with default server.wal_seg_size. -fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> { +pub fn prepare_safekeeper(tenant_id: ZTenantId, timeline_id: ZTimelineId, timeline: &Arc) -> Result<()> { let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting { protocol_version: 1, // current protocol pg_version: 0, // unknown proposer_id: [0u8; 16], system_id: 0, - ztli: swh.timelineid.unwrap(), - tenant_id: swh.tenantid.unwrap(), - tli: 0, + ztli: timeline_id, + tenant_id: tenant_id, + tli: xlog_utils::PG_TLI, wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests }); - let response = swh.timeline.get().process_msg(&greeting_request)?; + let response = timeline.process_msg(&greeting_request)?; match response { Some(AcceptorProposerMessage::Greeting(_)) => Ok(()), _ => anyhow::bail!("not GreetingResponse"), } } -fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> { +pub fn send_proposer_elected(timeline: &Arc, term: Term, lsn: Lsn) -> Result<()> { // add new term to existing history - let history = swh.timeline.get().get_info().acceptor_state.term_history; + let history = timeline.get_info().acceptor_state.term_history; let history = history.up_to(lsn.checked_sub(1u64).unwrap()); let mut history_entries = history.0; history_entries.push(TermSwitchEntry { term, lsn }); @@ -131,25 +133,25 @@ fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Resu term_history: history, }); - swh.timeline.get().process_msg(&proposer_elected_request)?; + timeline.process_msg(&proposer_elected_request)?; Ok(()) } #[derive(Serialize, Deserialize)] -struct InsertedWAL { - begin_lsn: Lsn, - end_lsn: Lsn, - append_response: AppendResponse, +pub struct InsertedWAL { + pub begin_lsn: Lsn, + pub end_lsn: Lsn, + pub append_response: AppendResponse, } /// Extend local WAL with new LogicalMessage record. To do that, /// create AppendRequest with new WAL and pass it to safekeeper. -fn append_logical_message( - swh: &mut SendWalHandler, +pub fn append_logical_message( + timeline: &Arc, msg: AppendLogicalMessage, ) -> Result { let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message); - let sk_state = swh.timeline.get().get_info(); + let sk_state = timeline.get_info(); let begin_lsn = msg.begin_lsn; let end_lsn = begin_lsn + wal_data.len() as u64; @@ -173,7 +175,7 @@ fn append_logical_message( wal_data: Bytes::from(wal_data), }); - let response = swh.timeline.get().process_msg(&append_request)?; + let response = timeline.process_msg(&append_request)?; let append_response = match response { Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, @@ -222,11 +224,15 @@ fn encode_logical_message(prefix: String, message: String) -> Vec { let mainrdata = logical_message.encode(); let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len(); - // only short mainrdata is supported for now - assert!(mainrdata_len <= 255); - let mainrdata_len = mainrdata_len as u8; - let mut data: Vec = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len]; + let mut data: Vec = vec![]; + if mainrdata_len <= 255 { + data.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT); + data.put_u8(mainrdata_len as u8); + } else { + data.put_u8(pg_constants::XLR_BLOCK_ID_DATA_LONG); + data.put_u64_le(mainrdata_len as u64); + } data.extend_from_slice(&mainrdata); data.extend_from_slice(&prefix_bytes); data.extend_from_slice(message_bytes); diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 2a15bb3fc6..1374e67d35 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -702,15 +702,17 @@ where ); self.decoder = WalStreamDecoder::new(msg.h.begin_lsn); } - self.decoder.feed_bytes(&msg.wal_data); - loop { - match self.decoder.poll_decode()? { - None => break, // no full record yet - Some((lsn, _rec)) => { - last_rec_lsn = lsn; - } - } - } + // XXX: decoder is disabled for this test + // self.decoder.feed_bytes(&msg.wal_data); + // loop { + // match self.decoder.poll_decode()? { + // None => break, // no full record yet + // Some((lsn, _rec)) => { + // last_rec_lsn = lsn; + // } + // } + // } + last_rec_lsn = msg.h.begin_lsn + msg.wal_data.len() as u64; // If this was the first record we ever receieved, remember LSN to help // find_end_of_wal skip the hole in the beginning.