mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 05:20:38 +00:00
Compare commits
3 Commits
test-proxy
...
safekeeper
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
165a1b7914 | ||
|
|
8f85d69b6a | ||
|
|
bc4b89013c |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -11,3 +11,6 @@ test_output/
|
|||||||
# Coverage
|
# Coverage
|
||||||
*.profraw
|
*.profraw
|
||||||
*.profdata
|
*.profdata
|
||||||
|
|
||||||
|
tmp1
|
||||||
|
tmp2
|
||||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: be8bdba074...b1b1138d26
91
walkeeper/src/bin/append_test.rs
Normal file
91
walkeeper/src/bin/append_test.rs
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
//
|
||||||
|
// Performance test for append handling in safekeeper.
|
||||||
|
//
|
||||||
|
use anyhow::Result;
|
||||||
|
use clap::{App, Arg};
|
||||||
|
use const_format::formatcp;
|
||||||
|
use daemonize::Daemonize;
|
||||||
|
use log::*;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::thread;
|
||||||
|
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
|
||||||
|
use walkeeper::http;
|
||||||
|
use walkeeper::s3_offload;
|
||||||
|
use walkeeper::wal_service;
|
||||||
|
use walkeeper::SafeKeeperConf;
|
||||||
|
use walkeeper::timeline::{GlobalTimelines, Timeline};
|
||||||
|
use walkeeper::timeline::CreateControlFile;
|
||||||
|
use walkeeper::json_ctrl;
|
||||||
|
use zenith_utils::http::endpoint;
|
||||||
|
use zenith_utils::shutdown::exit_now;
|
||||||
|
use zenith_utils::signals;
|
||||||
|
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
|
||||||
|
use zenith_utils::zid::{self, ZTimelineId, ZTenantId};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use walkeeper::safekeeper::{AcceptorProposerMessage, AppendResponse};
|
||||||
|
use walkeeper::safekeeper::{
|
||||||
|
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
|
||||||
|
};
|
||||||
|
use walkeeper::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
|
||||||
|
use walkeeper::send_wal::SendWalHandler;
|
||||||
|
use walkeeper::timeline::TimelineTools;
|
||||||
|
use postgres_ffi::pg_constants;
|
||||||
|
use postgres_ffi::xlog_utils;
|
||||||
|
use postgres_ffi::{uint32, uint64, Oid, XLogRecord};
|
||||||
|
use zenith_utils::lsn::Lsn;
|
||||||
|
use zenith_utils::postgres_backend::PostgresBackend;
|
||||||
|
use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
|
||||||
|
|
||||||
|
fn main() -> Result<()> {
|
||||||
|
zenith_metrics::set_common_metrics_prefix("safekeeper");
|
||||||
|
let log_file = logging::init("safekeeper.log", false)?;
|
||||||
|
info!("version: {}", GIT_VERSION);
|
||||||
|
|
||||||
|
let timeline_id = ZTimelineId::generate();
|
||||||
|
let tenant_id = ZTenantId::generate();
|
||||||
|
let create = CreateControlFile::True;
|
||||||
|
let conf = SafeKeeperConf::default();
|
||||||
|
|
||||||
|
let mut timeline = GlobalTimelines::get(&conf, tenant_id, timeline_id, create)?;
|
||||||
|
|
||||||
|
json_ctrl::prepare_safekeeper(tenant_id ,timeline_id, &mut timeline)?;
|
||||||
|
|
||||||
|
let term = 1;
|
||||||
|
let epoch_start_lsn = Lsn::from(0x16B9188);
|
||||||
|
let mut lsn = epoch_start_lsn;
|
||||||
|
|
||||||
|
json_ctrl::send_proposer_elected(&timeline, term, lsn)?;
|
||||||
|
|
||||||
|
let message = "a".repeat(1024 * 8);
|
||||||
|
|
||||||
|
info!("Starting test");
|
||||||
|
|
||||||
|
let now = std::time::Instant::now();
|
||||||
|
let test_duration = std::time::Duration::from_secs(10);
|
||||||
|
let timeout = now.checked_add(test_duration).unwrap();
|
||||||
|
|
||||||
|
let mut total_count = 0;
|
||||||
|
|
||||||
|
while std::time::Instant::now() < timeout {
|
||||||
|
let result = json_ctrl::append_logical_message(&timeline, json_ctrl::AppendLogicalMessage{
|
||||||
|
lm_prefix: "".to_string(),
|
||||||
|
lm_message: message.clone(),
|
||||||
|
set_commit_lsn: true,
|
||||||
|
send_proposer_elected: false,
|
||||||
|
term,
|
||||||
|
epoch_start_lsn,
|
||||||
|
begin_lsn: lsn,
|
||||||
|
truncate_lsn: lsn,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
lsn = result.end_lsn;
|
||||||
|
total_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Total count: {}", total_count);
|
||||||
|
info!("Test duration: {}s", test_duration.as_secs_f64());
|
||||||
|
info!("Appends per second: {}", total_count as f64 / test_duration.as_secs_f64());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -11,6 +11,7 @@ use bytes::{BufMut, Bytes, BytesMut};
|
|||||||
use crc32c::crc32c_append;
|
use crc32c::crc32c_append;
|
||||||
use log::*;
|
use log::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
|
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
|
||||||
use crate::safekeeper::{
|
use crate::safekeeper::{
|
||||||
@@ -18,31 +19,32 @@ use crate::safekeeper::{
|
|||||||
};
|
};
|
||||||
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
|
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
|
||||||
use crate::send_wal::SendWalHandler;
|
use crate::send_wal::SendWalHandler;
|
||||||
use crate::timeline::TimelineTools;
|
use crate::timeline::{TimelineTools, Timeline};
|
||||||
use postgres_ffi::pg_constants;
|
use postgres_ffi::pg_constants;
|
||||||
use postgres_ffi::xlog_utils;
|
use postgres_ffi::xlog_utils;
|
||||||
use postgres_ffi::{uint32, uint64, Oid, XLogRecord};
|
use postgres_ffi::{uint32, uint64, Oid, XLogRecord};
|
||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
use zenith_utils::postgres_backend::PostgresBackend;
|
use zenith_utils::postgres_backend::PostgresBackend;
|
||||||
use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
|
use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
|
||||||
|
use zenith_utils::zid::{ZTimelineId, ZTenantId};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
struct AppendLogicalMessage {
|
pub struct AppendLogicalMessage {
|
||||||
// prefix and message to build LogicalMessage
|
// prefix and message to build LogicalMessage
|
||||||
lm_prefix: String,
|
pub lm_prefix: String,
|
||||||
lm_message: String,
|
pub lm_message: String,
|
||||||
|
|
||||||
// if true, commit_lsn will match flush_lsn after append
|
// if true, commit_lsn will match flush_lsn after append
|
||||||
set_commit_lsn: bool,
|
pub set_commit_lsn: bool,
|
||||||
|
|
||||||
// if true, ProposerElected will be sent before append
|
// if true, ProposerElected will be sent before append
|
||||||
send_proposer_elected: bool,
|
pub send_proposer_elected: bool,
|
||||||
|
|
||||||
// fields from AppendRequestHeader
|
// fields from AppendRequestHeader
|
||||||
term: Term,
|
pub term: Term,
|
||||||
epoch_start_lsn: Lsn,
|
pub epoch_start_lsn: Lsn,
|
||||||
begin_lsn: Lsn,
|
pub begin_lsn: Lsn,
|
||||||
truncate_lsn: Lsn,
|
pub truncate_lsn: Lsn,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
@@ -71,14 +73,14 @@ pub fn handle_json_ctrl(
|
|||||||
info!("JSON_CTRL request: {:?}", append_request);
|
info!("JSON_CTRL request: {:?}", append_request);
|
||||||
|
|
||||||
// need to init safekeeper state before AppendRequest
|
// need to init safekeeper state before AppendRequest
|
||||||
prepare_safekeeper(swh)?;
|
prepare_safekeeper(swh.tenantid.unwrap(), swh.timelineid.unwrap(), swh.timeline.get())?;
|
||||||
|
|
||||||
// if send_proposer_elected is true, we need to update local history
|
// if send_proposer_elected is true, we need to update local history
|
||||||
if append_request.send_proposer_elected {
|
if append_request.send_proposer_elected {
|
||||||
send_proposer_elected(swh, append_request.term, append_request.epoch_start_lsn)?;
|
send_proposer_elected(swh.timeline.get(), append_request.term, append_request.epoch_start_lsn)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let inserted_wal = append_logical_message(swh, append_request)?;
|
let inserted_wal = append_logical_message(swh.timeline.get(), append_request)?;
|
||||||
let response = AppendResult {
|
let response = AppendResult {
|
||||||
state: swh.timeline.get().get_info(),
|
state: swh.timeline.get().get_info(),
|
||||||
inserted_wal,
|
inserted_wal,
|
||||||
@@ -98,28 +100,28 @@ pub fn handle_json_ctrl(
|
|||||||
|
|
||||||
/// Prepare safekeeper to process append requests without crashes,
|
/// Prepare safekeeper to process append requests without crashes,
|
||||||
/// by sending ProposerGreeting with default server.wal_seg_size.
|
/// by sending ProposerGreeting with default server.wal_seg_size.
|
||||||
fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> {
|
pub fn prepare_safekeeper(tenant_id: ZTenantId, timeline_id: ZTimelineId, timeline: &Arc<Timeline>) -> Result<()> {
|
||||||
let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting {
|
let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting {
|
||||||
protocol_version: 1, // current protocol
|
protocol_version: 1, // current protocol
|
||||||
pg_version: 0, // unknown
|
pg_version: 0, // unknown
|
||||||
proposer_id: [0u8; 16],
|
proposer_id: [0u8; 16],
|
||||||
system_id: 0,
|
system_id: 0,
|
||||||
ztli: swh.timelineid.unwrap(),
|
ztli: timeline_id,
|
||||||
tenant_id: swh.tenantid.unwrap(),
|
tenant_id: tenant_id,
|
||||||
tli: 0,
|
tli: xlog_utils::PG_TLI,
|
||||||
wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests
|
wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests
|
||||||
});
|
});
|
||||||
|
|
||||||
let response = swh.timeline.get().process_msg(&greeting_request)?;
|
let response = timeline.process_msg(&greeting_request)?;
|
||||||
match response {
|
match response {
|
||||||
Some(AcceptorProposerMessage::Greeting(_)) => Ok(()),
|
Some(AcceptorProposerMessage::Greeting(_)) => Ok(()),
|
||||||
_ => anyhow::bail!("not GreetingResponse"),
|
_ => anyhow::bail!("not GreetingResponse"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> {
|
pub fn send_proposer_elected(timeline: &Arc<Timeline>, term: Term, lsn: Lsn) -> Result<()> {
|
||||||
// add new term to existing history
|
// add new term to existing history
|
||||||
let history = swh.timeline.get().get_info().acceptor_state.term_history;
|
let history = timeline.get_info().acceptor_state.term_history;
|
||||||
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
|
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
|
||||||
let mut history_entries = history.0;
|
let mut history_entries = history.0;
|
||||||
history_entries.push(TermSwitchEntry { term, lsn });
|
history_entries.push(TermSwitchEntry { term, lsn });
|
||||||
@@ -131,25 +133,25 @@ fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Resu
|
|||||||
term_history: history,
|
term_history: history,
|
||||||
});
|
});
|
||||||
|
|
||||||
swh.timeline.get().process_msg(&proposer_elected_request)?;
|
timeline.process_msg(&proposer_elected_request)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct InsertedWAL {
|
pub struct InsertedWAL {
|
||||||
begin_lsn: Lsn,
|
pub begin_lsn: Lsn,
|
||||||
end_lsn: Lsn,
|
pub end_lsn: Lsn,
|
||||||
append_response: AppendResponse,
|
pub append_response: AppendResponse,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extend local WAL with new LogicalMessage record. To do that,
|
/// Extend local WAL with new LogicalMessage record. To do that,
|
||||||
/// create AppendRequest with new WAL and pass it to safekeeper.
|
/// create AppendRequest with new WAL and pass it to safekeeper.
|
||||||
fn append_logical_message(
|
pub fn append_logical_message(
|
||||||
swh: &mut SendWalHandler,
|
timeline: &Arc<Timeline>,
|
||||||
msg: AppendLogicalMessage,
|
msg: AppendLogicalMessage,
|
||||||
) -> Result<InsertedWAL> {
|
) -> Result<InsertedWAL> {
|
||||||
let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message);
|
let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message);
|
||||||
let sk_state = swh.timeline.get().get_info();
|
let sk_state = timeline.get_info();
|
||||||
|
|
||||||
let begin_lsn = msg.begin_lsn;
|
let begin_lsn = msg.begin_lsn;
|
||||||
let end_lsn = begin_lsn + wal_data.len() as u64;
|
let end_lsn = begin_lsn + wal_data.len() as u64;
|
||||||
@@ -173,7 +175,7 @@ fn append_logical_message(
|
|||||||
wal_data: Bytes::from(wal_data),
|
wal_data: Bytes::from(wal_data),
|
||||||
});
|
});
|
||||||
|
|
||||||
let response = swh.timeline.get().process_msg(&append_request)?;
|
let response = timeline.process_msg(&append_request)?;
|
||||||
|
|
||||||
let append_response = match response {
|
let append_response = match response {
|
||||||
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
|
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
|
||||||
@@ -222,11 +224,15 @@ fn encode_logical_message(prefix: String, message: String) -> Vec<u8> {
|
|||||||
|
|
||||||
let mainrdata = logical_message.encode();
|
let mainrdata = logical_message.encode();
|
||||||
let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
|
let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
|
||||||
// only short mainrdata is supported for now
|
|
||||||
assert!(mainrdata_len <= 255);
|
|
||||||
let mainrdata_len = mainrdata_len as u8;
|
|
||||||
|
|
||||||
let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
|
let mut data: Vec<u8> = vec![];
|
||||||
|
if mainrdata_len <= 255 {
|
||||||
|
data.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT);
|
||||||
|
data.put_u8(mainrdata_len as u8);
|
||||||
|
} else {
|
||||||
|
data.put_u8(pg_constants::XLR_BLOCK_ID_DATA_LONG);
|
||||||
|
data.put_u64_le(mainrdata_len as u64);
|
||||||
|
}
|
||||||
data.extend_from_slice(&mainrdata);
|
data.extend_from_slice(&mainrdata);
|
||||||
data.extend_from_slice(&prefix_bytes);
|
data.extend_from_slice(&prefix_bytes);
|
||||||
data.extend_from_slice(message_bytes);
|
data.extend_from_slice(message_bytes);
|
||||||
|
|||||||
@@ -702,15 +702,17 @@ where
|
|||||||
);
|
);
|
||||||
self.decoder = WalStreamDecoder::new(msg.h.begin_lsn);
|
self.decoder = WalStreamDecoder::new(msg.h.begin_lsn);
|
||||||
}
|
}
|
||||||
self.decoder.feed_bytes(&msg.wal_data);
|
// XXX: decoder is disabled for this test
|
||||||
loop {
|
// self.decoder.feed_bytes(&msg.wal_data);
|
||||||
match self.decoder.poll_decode()? {
|
// loop {
|
||||||
None => break, // no full record yet
|
// match self.decoder.poll_decode()? {
|
||||||
Some((lsn, _rec)) => {
|
// None => break, // no full record yet
|
||||||
last_rec_lsn = lsn;
|
// Some((lsn, _rec)) => {
|
||||||
}
|
// last_rec_lsn = lsn;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
// }
|
||||||
|
last_rec_lsn = msg.h.begin_lsn + msg.wal_data.len() as u64;
|
||||||
|
|
||||||
// If this was the first record we ever receieved, remember LSN to help
|
// If this was the first record we ever receieved, remember LSN to help
|
||||||
// find_end_of_wal skip the hole in the beginning.
|
// find_end_of_wal skip the hole in the beginning.
|
||||||
@@ -753,7 +755,10 @@ where
|
|||||||
self.s.commit_lsn = self.commit_lsn;
|
self.s.commit_lsn = self.commit_lsn;
|
||||||
self.s.truncate_lsn = self.truncate_lsn;
|
self.s.truncate_lsn = self.truncate_lsn;
|
||||||
}
|
}
|
||||||
self.storage.persist(&self.s, sync_control_file)?;
|
|
||||||
|
if sync_control_file {
|
||||||
|
self.storage.persist(&self.s, true)?;
|
||||||
|
}
|
||||||
|
|
||||||
let resp = self.append_response();
|
let resp = self.append_response();
|
||||||
info!(
|
info!(
|
||||||
|
|||||||
Reference in New Issue
Block a user