From f49ad33f1b2613e5185afaf518278ba59b8b2fbf Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 30 Nov 2021 22:23:59 +0200 Subject: [PATCH 01/12] Initialize 'loaded' correctly in DeltaLayer. While we're at it, reuse the Book and the VirtualFile that's backing it even over unload() calls. Previously, we would keep the Book open, but on load(), we would re-open it anyway, which didn't make much sense. Now we reuse it it. Alternatively, perhaps we should close it on unload() to save some memory, but this I'm not going to think too hard about it right now as the whole load/unload thing is a bit of a hack and needs to be rewritten. This is hard to reproduce ATM, because the incorrect state would get fixed by an unload(). A checkpoint creates the DeltaLayer, and it also calls unload() afterwards, so the window is not very large. I hit it occasionally with a scale 1000 pgbench test, after I had modified InMemoryLayer::write_to_disk() to not write an image layer every time, which made the DeltaLayers be accessed more often. --- .../src/layered_repository/delta_layer.rs | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 43db88f74e..f335ddfbdb 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -297,6 +297,11 @@ impl Layer for DeltaLayer { inner.page_version_metas = VecMap::default(); inner.relsizes = VecMap::default(); inner.loaded = false; + + // Note: we keep the Book open. Is that a good idea? The virtual file + // machinery has its own rules for closing the file descriptor if it's not + // needed, but the Book struct uses up some memory, too. + Ok(()) } @@ -410,7 +415,7 @@ impl DeltaLayer { end_lsn, dropped, inner: Mutex::new(DeltaLayerInner { - loaded: true, + loaded: false, book: None, page_version_metas: VecMap::default(), relsizes, @@ -495,8 +500,13 @@ impl DeltaLayer { } let path = self.path(); - let file = VirtualFile::open(&path)?; - let book = Book::new(file)?; + + // Open the file if it's not open already. + if inner.book.is_none() { + let file = VirtualFile::open(&path)?; + inner.book = Some(Book::new(file)?); + } + let book = inner.book.as_ref().unwrap(); match &self.path_or_conf { PathOrConf::Conf(_) => { @@ -531,12 +541,9 @@ impl DeltaLayer { debug!("loaded from {}", &path.display()); - *inner = DeltaLayerInner { - loaded: true, - book: Some(book), - page_version_metas, - relsizes, - }; + inner.page_version_metas = page_version_metas; + inner.relsizes = relsizes; + inner.loaded = true; Ok(inner) } From 2669d140f8b3757c42442100bd73f1d0c3eac612 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 1 Dec 2021 15:27:44 +0300 Subject: [PATCH 02/12] use full commit sha for version info for builds in docker this is not needed, since environment variable with commit sha already contains full version --- zenith_utils/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 8730656eab..eb9948ed64 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -72,5 +72,6 @@ pub mod nonblock; use git_version::git_version; pub const GIT_VERSION: &str = git_version!( prefix = "git:", - fallback = concat!("git-env:", env!("GIT_VERSION")) + fallback = concat!("git-env:", env!("GIT_VERSION")), + args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha ); From cba4da3f4d90e0bae68cd4a6d5498fba7eb410bc Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 5 Nov 2021 13:39:41 +0300 Subject: [PATCH 03/12] Add term history to safekeepers. Persist full history of term switches on safekeepers instead of storing only the single term of the highest entry (called epoch). This allows easily and correctly find the divergence point of two logs and truncate the obsolete part before overwriting it with entries of the newer proposer(s). Full history of the proposer is transferred in separate message before proposer starts streaming; it is immediately persisted by safekeeper, though he might not yet have entries for some older terms there. That's because we can't atomically append to WAL and update the control file anyway, so locally available WAL must be taken into account when looking at the history. We should sometimes purge term history entries beyond truncate_lsn; this is not done here. Per https://github.com/zenithdb/rfcs/pull/12 Closes #296. Bumps vendor/postgres. --- test_runner/batch_others/test_wal_acceptor.py | 1 + vendor/postgres | 2 +- walkeeper/src/http/routes.rs | 22 +- walkeeper/src/json_ctrl.rs | 34 +- walkeeper/src/receive_wal.rs | 7 +- walkeeper/src/safekeeper.rs | 387 +++++++++++++----- walkeeper/src/timeline.rs | 85 +++- 7 files changed, 417 insertions(+), 121 deletions(-) diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 57cf379a96..120e6c769b 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -392,6 +392,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin): "lm_prefix": "prefix", "lm_message": "message", "set_commit_lsn": True, + "send_proposer_elected": True, "term": 2, "begin_lsn": begin_lsn, "epoch_start_lsn": epoch_start_lsn, diff --git a/vendor/postgres b/vendor/postgres index 08878b19d3..a70d892bb9 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 08878b19d3cae5a1bd765bf7396187b6b806c6ac +Subproject commit a70d892bb93e0a8a6cda8a8fccd4e4fbf408ea79 diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index 159ec17a9b..dc1905a5a7 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -7,7 +7,8 @@ use std::fmt::Display; use std::sync::Arc; use zenith_utils::lsn::Lsn; -use crate::safekeeper::AcceptorState; +use crate::safekeeper::Term; +use crate::safekeeper::TermHistory; use crate::timeline::CreateControlFile; use crate::timeline::GlobalTimelines; use crate::SafeKeeperConf; @@ -29,6 +30,7 @@ fn get_conf(request: &Request) -> &SafeKeeperConf { .as_ref() } +/// Serialize through Display trait. fn display_serialize(z: &F, s: S) -> Result where S: Serializer, @@ -37,6 +39,14 @@ where s.serialize_str(&format!("{}", z)) } +/// Augment AcceptorState with epoch for convenience +#[derive(Debug, Serialize)] +struct AcceptorStateStatus { + term: Term, + epoch: Term, + term_history: TermHistory, +} + /// Info about timeline on safekeeper ready for reporting. #[derive(Debug, Serialize)] struct TimelineStatus { @@ -44,7 +54,7 @@ struct TimelineStatus { tenant_id: ZTenantId, #[serde(serialize_with = "display_serialize")] timeline_id: ZTimelineId, - acceptor_state: AcceptorState, + acceptor_state: AcceptorStateStatus, #[serde(serialize_with = "display_serialize")] commit_lsn: Lsn, #[serde(serialize_with = "display_serialize")] @@ -68,10 +78,16 @@ async fn timeline_status_handler(request: Request) -> Result Result<()> { let response = swh.timeline.get().process_msg(&greeting_request)?; match response { - AcceptorProposerMessage::Greeting(_) => Ok(()), + Some(AcceptorProposerMessage::Greeting(_)) => Ok(()), _ => anyhow::bail!("not GreetingResponse"), } } +fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> { + // add new term to existing history + let history = swh.timeline.get().get_info().acceptor_state.term_history; + let history = history.up_to(lsn.checked_sub(1u64).unwrap()); + let mut history_entries = history.0; + history_entries.push(TermSwitchEntry { term, lsn }); + let history = TermHistory(history_entries); + + let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected { + term, + start_streaming_at: lsn, + term_history: history, + }); + + swh.timeline.get().process_msg(&proposer_elected_request)?; + Ok(()) +} + #[derive(Serialize, Deserialize)] struct InsertedWAL { begin_lsn: Lsn, @@ -150,7 +176,7 @@ fn append_logical_message( let response = swh.timeline.get().process_msg(&append_request)?; let append_response = match response { - AcceptorProposerMessage::AppendResponse(resp) => resp, + Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, _ => anyhow::bail!("not AppendResponse"), }; diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 9498980802..a653c41922 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Context, Result}; use bytes::Bytes; +use bytes::BytesMut; use log::*; use postgres::{Client, Config, NoTls}; @@ -98,7 +99,7 @@ impl<'pg> ReceiveWalConn<'pg> { // Send message to the postgres fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> { - let mut buf = Vec::new(); + let mut buf = BytesMut::with_capacity(128); msg.serialize(&mut buf)?; self.pg_backend.write_message(&BeMessage::CopyData(&buf))?; Ok(()) @@ -147,7 +148,9 @@ impl<'pg> ReceiveWalConn<'pg> { .get() .process_msg(&msg) .with_context(|| "failed to process ProposerAcceptorMessage")?; - self.write_msg(&reply)?; + if let Some(reply) = reply { + self.write_msg(&reply)?; + } msg = self.read_msg()?; } } diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 0b25241165..2a15bb3fc6 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -4,16 +4,16 @@ use anyhow::Context; use anyhow::{anyhow, bail, Result}; use byteorder::LittleEndian; use byteorder::ReadBytesExt; -use byteorder::WriteBytesExt; use bytes::Buf; +use bytes::BufMut; use bytes::Bytes; +use bytes::BytesMut; use log::*; use pageserver::waldecoder::WalStreamDecoder; use postgres_ffi::xlog_utils::TimeLineID; use serde::{Deserialize, Serialize}; -use std::cmp::max; use std::cmp::min; -use std::io; +use std::fmt; use std::io::Read; use lazy_static::lazy_static; @@ -37,6 +37,70 @@ const UNKNOWN_SERVER_VERSION: u32 = 0; /// Consensus logical timestamp. pub type Term = u64; +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct TermSwitchEntry { + pub term: Term, + pub lsn: Lsn, +} +#[derive(Clone, Serialize, Deserialize)] +pub struct TermHistory(pub Vec); + +impl TermHistory { + pub fn empty() -> TermHistory { + TermHistory(Vec::new()) + } + + // Parse TermHistory as n_entries followed by TermSwitchEntry pairs + pub fn from_bytes(mut bytes: Bytes) -> Result { + if bytes.remaining() < 4 { + bail!("TermHistory misses len"); + } + let n_entries = bytes.get_u32_le(); + let mut res = Vec::with_capacity(n_entries as usize); + for _ in 0..n_entries { + if bytes.remaining() < 16 { + bail!("TermHistory is incomplete"); + } + res.push(TermSwitchEntry { + term: bytes.get_u64_le(), + lsn: bytes.get_u64_le().into(), + }) + } + Ok(TermHistory(res)) + } + + /// Return copy of self with switches happening strictly after up_to + /// truncated. + pub fn up_to(&self, up_to: Lsn) -> TermHistory { + let mut res = Vec::with_capacity(self.0.len()); + for e in &self.0 { + if e.lsn > up_to { + break; + } + res.push(*e); + } + TermHistory(res) + } +} + +/// Display only latest entries for Debug. +impl fmt::Debug for TermHistory { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let n_printed = 20; + write!( + fmt, + "{}{:?}", + if self.0.len() > n_printed { "... " } else { "" }, + self.0 + .iter() + .rev() + .take(n_printed) + .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry + .collect::>() + ) + } +} + /// Unique id of proposer. Not needed for correctness, used for monitoring. type PgUuid = [u8; 16]; @@ -45,8 +109,21 @@ type PgUuid = [u8; 16]; pub struct AcceptorState { /// acceptor's last term it voted for (advanced in 1 phase) pub term: Term, - /// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached). - pub epoch: Term, + /// History of term switches for safekeeper's WAL. + /// Actually it often goes *beyond* WAL contents as we adopt term history + /// from the proposer before recovery. + pub term_history: TermHistory, +} + +impl AcceptorState { + /// acceptor's epoch is the term of the highest entry in the log + pub fn get_epoch(&self, flush_lsn: Lsn) -> Term { + let th = self.term_history.up_to(flush_lsn); + match th.0.last() { + Some(e) => e.term, + None => 0, + } + } } /// Information about Postgres. Safekeeper gets it once and then verifies @@ -91,7 +168,10 @@ impl SafeKeeperState { SafeKeeperState { magic: SK_MAGIC, format_version: SK_FORMAT_VERSION, - acceptor_state: AcceptorState { term: 0, epoch: 0 }, + acceptor_state: AcceptorState { + term: 0, + term_history: TermHistory::empty(), + }, server: ServerInfo { pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ system_id: 0, /* Postgres system identifier */ @@ -147,16 +227,28 @@ pub struct VoteRequest { /// Vote itself, sent from safekeeper to proposer #[derive(Debug, Serialize)] pub struct VoteResponse { - term: Term, // not really needed, just a sanity check + term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date. vote_given: u64, // fixme u64 due to padding - /// Safekeeper's log position, to let proposer choose the most advanced one - epoch: Term, + // Safekeeper flush_lsn (end of WAL) + history of term switches allow + // proposer to choose the most advanced one. flush_lsn: Lsn, truncate_lsn: Lsn, + term_history: TermHistory, +} + +/* + * Proposer -> Acceptor message announcing proposer is elected and communicating + * term history to it. + */ +#[derive(Debug)] +pub struct ProposerElected { + pub term: Term, + pub start_streaming_at: Lsn, + pub term_history: TermHistory, } /// Request with WAL message sent from proposer to safekeeper. Along the way it -/// announces 1) successful election (with epoch_start_lsn); 2) commit_lsn. +/// communicates commit_lsn. #[derive(Debug)] pub struct AppendRequest { pub h: AppendRequestHeader, @@ -164,6 +256,7 @@ pub struct AppendRequest { } #[derive(Debug, Clone, Deserialize)] pub struct AppendRequestHeader { + // safekeeper's current term; if it is higher than proposer's, the compute is out of date. pub term: Term, // LSN since the proposer appends WAL; determines epoch switch point. pub epoch_start_lsn: Lsn, @@ -185,7 +278,6 @@ pub struct AppendResponse { // Current term of the safekeeper; if it is higher than proposer's, the // compute is out of date. pub term: Term, - pub epoch: Term, // NOTE: this is physical end of wal on safekeeper; currently it doesn't // make much sense without taking epoch into account, as history can be // diverged. @@ -198,19 +290,32 @@ pub struct AppendResponse { pub hs_feedback: HotStandbyFeedback, } +impl AppendResponse { + fn term_only(term: Term) -> AppendResponse { + AppendResponse { + term, + flush_lsn: Lsn(0), + commit_lsn: Lsn(0), + disk_consistent_lsn: Lsn(0), + hs_feedback: HotStandbyFeedback::empty(), + } + } +} + /// Proposer -> Acceptor messages #[derive(Debug)] pub enum ProposerAcceptorMessage { Greeting(ProposerGreeting), VoteRequest(VoteRequest), + Elected(ProposerElected), AppendRequest(AppendRequest), } impl ProposerAcceptorMessage { /// Parse proposer message. - pub fn parse(msg: Bytes) -> Result { + pub fn parse(msg_bytes: Bytes) -> Result { // xxx using Reader is inefficient but easy to work with bincode - let mut stream = msg.reader(); + let mut stream = msg_bytes.reader(); // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is let tag = stream.read_u64::()? as u8 as char; match tag { @@ -222,6 +327,21 @@ impl ProposerAcceptorMessage { let msg = VoteRequest::des_from(&mut stream)?; Ok(ProposerAcceptorMessage::VoteRequest(msg)) } + 'e' => { + let mut msg_bytes = stream.into_inner(); + if msg_bytes.remaining() < 16 { + bail!("ProposerElected message is not complete"); + } + let term = msg_bytes.get_u64_le(); + let start_streaming_at = msg_bytes.get_u64_le().into(); + let term_history = TermHistory::from_bytes(msg_bytes)?; + let msg = ProposerElected { + term, + start_streaming_at, + term_history, + }; + Ok(ProposerAcceptorMessage::Elected(msg)) + } 'a' => { // read header followed by wal data let hdr = AppendRequestHeader::des_from(&mut stream)?; @@ -259,19 +379,33 @@ pub enum AcceptorProposerMessage { impl AcceptorProposerMessage { /// Serialize acceptor -> proposer message. - pub fn serialize(&self, stream: &mut impl io::Write) -> Result<()> { + pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> { match self { AcceptorProposerMessage::Greeting(msg) => { - stream.write_u64::('g' as u64)?; - msg.ser_into(stream)?; + buf.put_u64_le('g' as u64); + buf.put_u64_le(msg.term); } AcceptorProposerMessage::VoteResponse(msg) => { - stream.write_u64::('v' as u64)?; - msg.ser_into(stream)?; + buf.put_u64_le('v' as u64); + buf.put_u64_le(msg.term); + buf.put_u64_le(msg.vote_given); + buf.put_u64_le(msg.flush_lsn.into()); + buf.put_u64_le(msg.truncate_lsn.into()); + buf.put_u32_le(msg.term_history.0.len() as u32); + for e in &msg.term_history.0 { + buf.put_u64_le(e.term); + buf.put_u64_le(e.lsn.into()); + } } AcceptorProposerMessage::AppendResponse(msg) => { - stream.write_u64::('a' as u64)?; - msg.ser_into(stream)?; + buf.put_u64_le('a' as u64); + buf.put_u64_le(msg.term); + buf.put_u64_le(msg.flush_lsn.into()); + buf.put_u64_le(msg.commit_lsn.into()); + buf.put_u64_le(msg.disk_consistent_lsn.into()); + buf.put_i64_le(msg.hs_feedback.ts); + buf.put_u64_le(msg.hs_feedback.xmin); + buf.put_u64_le(msg.hs_feedback.catalog_xmin); } } @@ -284,6 +418,8 @@ pub trait Storage { fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()>; /// Write piece of wal in buf to disk and sync it. fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>; + // Truncate WAL at specified LSN + fn truncate_wal(&mut self, s: &ServerInfo, endpos: Lsn) -> Result<()>; } lazy_static! { @@ -357,8 +493,7 @@ pub struct SafeKeeper { pub commit_lsn: Lsn, pub truncate_lsn: Lsn, pub storage: ST, - pub s: SafeKeeperState, // persistent part - pub elected_proposer_term: Term, // for monitoring/debugging + pub s: SafeKeeperState, // persistent part decoder: WalStreamDecoder, } @@ -375,27 +510,40 @@ where truncate_lsn: state.truncate_lsn, storage, s: state, - elected_proposer_term: 0, decoder: WalStreamDecoder::new(Lsn(0)), } } + /// Get history of term switches for the available WAL + fn get_term_history(&self) -> TermHistory { + self.s.acceptor_state.term_history.up_to(self.flush_lsn) + } + + #[cfg(test)] + fn get_epoch(&self) -> Term { + self.s.acceptor_state.get_epoch(self.flush_lsn) + } + /// Process message from proposer and possibly form reply. Concurrent /// callers must exclude each other. pub fn process_msg( &mut self, msg: &ProposerAcceptorMessage, - ) -> Result { + ) -> Result> { match msg { ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg), ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg), + ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg), ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg), } } /// Handle initial message from proposer: check its sanity and send my /// current term. - fn handle_greeting(&mut self, msg: &ProposerGreeting) -> Result { + fn handle_greeting( + &mut self, + msg: &ProposerGreeting, + ) -> Result> { /* Check protocol compatibility */ if msg.protocol_version != SK_PROTOCOL_VERSION { bail!( @@ -429,64 +577,106 @@ where "processed greeting from proposer {:?}, sending term {:?}", msg.proposer_id, self.s.acceptor_state.term ); - Ok(AcceptorProposerMessage::Greeting(AcceptorGreeting { + Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting { term: self.s.acceptor_state.term, - })) + }))) } /// Give vote for the given term, if we haven't done that previously. - fn handle_vote_request(&mut self, msg: &VoteRequest) -> Result { + fn handle_vote_request( + &mut self, + msg: &VoteRequest, + ) -> Result> { // initialize with refusal let mut resp = VoteResponse { - term: msg.term, + term: self.s.acceptor_state.term, vote_given: false as u64, - epoch: 0, - flush_lsn: Lsn(0), - truncate_lsn: Lsn(0), + flush_lsn: self.flush_lsn, + truncate_lsn: self.s.truncate_lsn, + term_history: self.get_term_history(), }; if self.s.acceptor_state.term < msg.term { self.s.acceptor_state.term = msg.term; // persist vote before sending it out self.storage.persist(&self.s, true)?; + resp.term = self.s.acceptor_state.term; resp.vote_given = true as u64; - resp.epoch = self.s.acceptor_state.epoch; - resp.flush_lsn = self.flush_lsn; - resp.truncate_lsn = self.s.truncate_lsn; } info!("processed VoteRequest for term {}: {:?}", msg.term, &resp); - Ok(AcceptorProposerMessage::VoteResponse(resp)) + Ok(Some(AcceptorProposerMessage::VoteResponse(resp))) + } + + /// Bump our term if received a note from elected proposer with higher one + fn bump_if_higher(&mut self, term: Term) -> Result<()> { + if self.s.acceptor_state.term < term { + self.s.acceptor_state.term = term; + self.storage.persist(&self.s, true)?; + } + Ok(()) + } + + /// Form AppendResponse from current state. + fn append_response(&self) -> AppendResponse { + AppendResponse { + term: self.s.acceptor_state.term, + flush_lsn: self.flush_lsn, + commit_lsn: self.s.commit_lsn, + disk_consistent_lsn: Lsn(0), + // will be filled by the upper code to avoid bothering safekeeper + hs_feedback: HotStandbyFeedback::empty(), + } + } + + fn handle_elected(&mut self, msg: &ProposerElected) -> Result> { + info!("received ProposerElected {:?}", msg); + self.bump_if_higher(msg.term)?; + // If our term is higher, ignore the message (next feedback will inform the compute) + if self.s.acceptor_state.term > msg.term { + return Ok(None); + } + + // TODO: cross check divergence point + + // streaming must not create a hole + assert!(self.flush_lsn == Lsn(0) || self.flush_lsn >= msg.start_streaming_at); + + // truncate obsolete part of WAL + if self.flush_lsn != Lsn(0) { + self.storage + .truncate_wal(&self.s.server, msg.start_streaming_at)?; + } + // update our end of WAL pointer + self.flush_lsn = msg.start_streaming_at; + // and now adopt term history from proposer + self.s.acceptor_state.term_history = msg.term_history.clone(); + self.storage.persist(&self.s, true)?; + + info!("start receiving WAL since {:?}", msg.start_streaming_at); + + Ok(None) } /// Handle request to append WAL. #[allow(clippy::comparison_chain)] - fn handle_append_request(&mut self, msg: &AppendRequest) -> Result { - // log first AppendRequest from this proposer - if self.elected_proposer_term < msg.h.term { - info!( - "start accepting WAL from timeline {}, tenant {}, term {}, epochStartLsn {:?}", - self.s.server.ztli, self.s.server.tenant_id, msg.h.term, msg.h.epoch_start_lsn, - ); - self.elected_proposer_term = msg.h.term; + fn handle_append_request( + &mut self, + msg: &AppendRequest, + ) -> Result> { + if self.s.acceptor_state.term < msg.h.term { + bail!("got AppendRequest before ProposerElected"); } - // If our term is lower than elected proposer one, bump it. - if self.s.acceptor_state.term < msg.h.term { - self.s.acceptor_state.term = msg.h.term; - self.storage.persist(&self.s, true)?; - } - // OTOH, if it is higher, immediately refuse the message. - else if self.s.acceptor_state.term > msg.h.term { - let resp = AppendResponse { - term: self.s.acceptor_state.term, - epoch: self.s.acceptor_state.epoch, - commit_lsn: Lsn(0), - flush_lsn: Lsn(0), - disk_consistent_lsn: Lsn(0), - hs_feedback: HotStandbyFeedback::empty(), - }; - return Ok(AcceptorProposerMessage::AppendResponse(resp)); + // If our term is higher, immediately refuse the message. + if self.s.acceptor_state.term > msg.h.term { + let resp = AppendResponse::term_only(self.s.acceptor_state.term); + return Ok(Some(AcceptorProposerMessage::AppendResponse(resp))); } + // After ProposerElected, which performs truncation, we should get only + // indeed append requests (but flush_lsn is advanced only on record + // boundary, so might be less). + assert!(self.flush_lsn <= msg.h.begin_lsn); + self.s.proposer_uuid = msg.h.proposer_uuid; let mut sync_control_file = false; @@ -530,48 +720,21 @@ where } } - /* - * Epoch switch happen when written WAL record cross the boundary. - * The boundary is maximum of last WAL position at this node (FlushLSN) and global - * maximum (vcl) determined by WAL proposer during handshake. - * Switching epoch means that node completes recovery and start writing in the WAL new data. - * XXX: this is wrong, we must actively truncate not matching part of log. - * - * The non-strict inequality is important for us, as proposer in --sync mode doesn't - * generate new records, but to advance commit_lsn epoch switch must happen on majority. - * We can regard this as commit of empty entry in new epoch, this should be safe. - */ - if self.s.acceptor_state.epoch < msg.h.term - && msg.h.end_lsn >= max(self.flush_lsn, msg.h.epoch_start_lsn) - { - info!( - "switched to new epoch {} on receival of request end_lsn={:?}, len={:?}", - msg.h.term, - msg.h.end_lsn, - msg.wal_data.len(), - ); - self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */ - sync_control_file = true; - } if last_rec_lsn > self.flush_lsn { self.flush_lsn = last_rec_lsn; self.metrics.flush_lsn.set(u64::from(self.flush_lsn) as f64); } - // Advance commit_lsn taking into account what we have locally. xxx this - // is wrapped into epoch check because we overwrite wal instead of - // truncating it, so without it commit_lsn might include wrong part. - // Anyway, nobody is much interested in our commit_lsn while epoch - // switch hasn't happened, right? - // + // Advance commit_lsn taking into account what we have locally. // commit_lsn can be 0, being unknown to new walproposer while he hasn't // collected majority of its epoch acks yet, ignore it in this case. - if self.s.acceptor_state.epoch == msg.h.term && msg.h.commit_lsn != Lsn(0) { + if msg.h.commit_lsn != Lsn(0) { let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn); - // If new commit_lsn reached epoch switch, force sync of control file: - // walproposer in sync mode is very interested when this happens. - sync_control_file |= - commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn; + // If new commit_lsn reached epoch switch, force sync of control + // file: walproposer in sync mode is very interested when this + // happens. Note: this is for sync-safekeepers mode only, as + // otherwise commit_lsn might jump over epoch_start_lsn. + sync_control_file |= commit_lsn == msg.h.epoch_start_lsn; self.commit_lsn = commit_lsn; self.metrics .commit_lsn @@ -592,15 +755,7 @@ where } self.storage.persist(&self.s, sync_control_file)?; - let resp = AppendResponse { - term: self.s.acceptor_state.term, - epoch: self.s.acceptor_state.epoch, - flush_lsn: self.flush_lsn, - commit_lsn: self.s.commit_lsn, - disk_consistent_lsn: Lsn(0), - // will be filled by caller code to avoid bothering safekeeper - hs_feedback: HotStandbyFeedback::empty(), - }; + let resp = self.append_response(); info!( "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}", msg.wal_data.len(), @@ -609,7 +764,7 @@ where msg.h.truncate_lsn, &resp, ); - Ok(AcceptorProposerMessage::AppendResponse(resp)) + Ok(Some(AcceptorProposerMessage::AppendResponse(resp))) } } @@ -631,6 +786,10 @@ mod tests { fn write_wal(&mut self, _server: &ServerInfo, _startpos: Lsn, _buf: &[u8]) -> Result<()> { Ok(()) } + + fn truncate_wal(&mut self, _server: &ServerInfo, _end_pos: Lsn) -> Result<()> { + Ok(()) + } } #[test] @@ -644,7 +803,7 @@ mod tests { let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); let mut vote_resp = sk.process_msg(&vote_request); match vote_resp.unwrap() { - AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given != 0), + Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0), r => panic!("unexpected response: {:?}", r), } @@ -658,7 +817,7 @@ mod tests { // and ensure voting second time for 1 is not ok vote_resp = sk.process_msg(&vote_request); match vote_resp.unwrap() { - AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given == 0), + Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0), r => panic!("unexpected response: {:?}", r), } } @@ -684,10 +843,21 @@ mod tests { wal_data: Bytes::from_static(b"b"), }; + let pem = ProposerElected { + term: 1, + start_streaming_at: Lsn(1), + term_history: TermHistory(vec![TermSwitchEntry { + term: 1, + lsn: Lsn(3), + }]), + }; + sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) + .unwrap(); + // check that AppendRequest before epochStartLsn doesn't switch epoch let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 0); + assert_eq!(sk.get_epoch(), 0); // but record at epochStartLsn does the switch ar_hdr.begin_lsn = Lsn(2); @@ -698,6 +868,7 @@ mod tests { }; let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 1); + sk.flush_lsn = Lsn(3); // imitate the complete record at 3 %) + assert_eq!(sk.get_epoch(), 1); } } diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index b2698faa82..9e48a833d4 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -317,8 +317,11 @@ impl Timeline { } /// Pass arrived message to the safekeeper. - pub fn process_msg(&self, msg: &ProposerAcceptorMessage) -> Result { - let mut rmsg: AcceptorProposerMessage; + pub fn process_msg( + &self, + msg: &ProposerAcceptorMessage, + ) -> Result> { + let mut rmsg: Option; let commit_lsn: Lsn; { let mut shared_state = self.mutex.lock().unwrap(); @@ -328,7 +331,7 @@ impl Timeline { commit_lsn = shared_state.sk.commit_lsn; // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn - if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg { + if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { let state = shared_state.get_replicas_state(); resp.hs_feedback = state.hs_feedback; resp.disk_consistent_lsn = state.disk_consistent_lsn; @@ -596,6 +599,82 @@ impl Storage for FileStorage { } Ok(()) } + + fn truncate_wal(&mut self, server: &ServerInfo, end_pos: Lsn) -> Result<()> { + let partial; + const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; + let wal_seg_size = server.wal_seg_size as usize; + let ztli = server.ztli; + + /* Extract WAL location for this block */ + let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize; + + /* Open file */ + let mut segno = end_pos.segment_number(wal_seg_size); + // note: we basically don't support changing pg timeline + let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let wal_file_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name.clone()); + let wal_file_partial_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name + ".partial"); + { + let mut wal_file: File; + /* Try to open already completed segment */ + if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { + wal_file = file; + partial = false; + } else { + wal_file = OpenOptions::new() + .write(true) + .open(&wal_file_partial_path)?; + partial = true; + } + wal_file.seek(SeekFrom::Start(xlogoff as u64))?; + while xlogoff < wal_seg_size { + let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff); + wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?; + xlogoff += bytes_to_write; + } + // Flush file, if not said otherwise + if !self.conf.no_sync { + wal_file.sync_all()?; + } + } + if !partial { + // Make segment partial once again + fs::rename(&wal_file_path, &wal_file_partial_path)?; + } + // Remove all subsequent segments + loop { + segno += 1; + let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let wal_file_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name.clone()); + let wal_file_partial_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name.clone() + ".partial"); + // TODO: better use fs::try_exists which is currenty avaialble only in nightly build + if wal_file_path.exists() { + fs::remove_file(&wal_file_path)?; + } else if wal_file_partial_path.exists() { + fs::remove_file(&wal_file_partial_path)?; + } else { + break; + } + } + Ok(()) + } } #[cfg(test)] From d39608c3679a70911514500f5899913e6857b0b9 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 2 Dec 2021 15:48:44 +0300 Subject: [PATCH 04/12] Fix passing start_offset to find_end_of_wal_segment. --- postgres_ffi/src/xlog_utils.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 7d21b2fc51..7fe4f40158 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -187,8 +187,13 @@ fn find_end_of_wal_segment( let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs + 4]) as usize; if xl_tot_len == 0 { info!( - "find_end_of_wal_segment reached zeros at {:?}", - Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)) + "find_end_of_wal_segment reached zeros at {:?}, last records ends at {:?}", + Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)), + Lsn(XLogSegNoOffsetToRecPtr( + segno, + last_valid_rec_pos as u32, + wal_seg_size + )) ); break; // zeros, reached the end } @@ -303,12 +308,17 @@ pub fn find_end_of_wal( high_segno, ); } + let start_offset = if start_lsn.segment_number(wal_seg_size) == high_segno { + start_lsn.segment_offset(wal_seg_size) + } else { + 0 + }; high_offs = find_end_of_wal_segment( data_dir, high_segno, high_tli, wal_seg_size, - start_lsn.segment_offset(wal_seg_size), + start_offset, )?; } let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size); From 5bad2deff894d1c44d811e8365297d3422f40680 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 3 Dec 2021 07:42:10 -0500 Subject: [PATCH 05/12] Don't hold 'timelines' lock over checkpoint. It was very noticeable that you while the checkpointer was busy, you could not e.g. open a new connection. --- pageserver/src/layered_repository.rs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index af046536e0..3124cb3488 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -238,16 +238,22 @@ impl Repository for LayeredRepository { } fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()> { - { - let timelines = self.timelines.lock().unwrap(); + // Scan through the hashmap and collect a list of all the timelines, + // while holding the lock. Then drop the lock and actually perform the + // checkpoints. We don't want to block everything else while the + // checkpoint runs. + let timelines = self.timelines.lock().unwrap(); + let timelines_to_checkpoint: Vec<(ZTimelineId, Arc)> = timelines + .iter() + .map(|(timelineid, timeline)| (*timelineid, timeline.clone())) + .collect(); + drop(timelines); - for (timelineid, timeline) in timelines.iter() { - let _entered = - info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid) - .entered(); + for (timelineid, timeline) in timelines_to_checkpoint.iter() { + let _entered = + info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid).entered(); - timeline.checkpoint(cconf)?; - } + timeline.checkpoint(cconf)?; } Ok(()) From c7f3b4e62c34138c1d61355b8c7f1f247d8c79ab Mon Sep 17 00:00:00 2001 From: anastasia Date: Wed, 10 Nov 2021 00:12:08 +0300 Subject: [PATCH 06/12] Clarify the meaning of StandbyReply LSNs: write_lsn - The last LSN received and processed by pageserver's walreceiver. flush_lsn - same as write_lsn. At pageserver it doesn't guarantees data persistence, but it's fine. We rely on safekeepers. apply_lsn - The LSN at which pageserver guaranteed persistence of all received data (disk_consistent_lsn). --- pageserver/src/walreceiver.rs | 24 +++++++++++++++++------- vendor/postgres | 2 +- walkeeper/src/replication.rs | 8 ++++---- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 1f84ed8507..ba4f1aa1e5 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -321,14 +321,24 @@ fn walreceiver_main( }; if let Some(last_lsn) = status_update { - // TODO: More thought should go into what values are sent here. let last_lsn = PgLsn::from(u64::from(last_lsn)); - // We are using disk consistent LSN as `write_lsn`, i.e. LSN at which page server - // may guarantee persistence of all received data. Safekeeper is not free to remove - // WAL preceding `write_lsn`: it should not be requested by this page server. - let write_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); - let flush_lsn = last_lsn; - let apply_lsn = PgLsn::from(0); + + // The last LSN we processed. It is not guaranteed to survive pageserver crash. + let write_lsn = last_lsn; + // This value doesn't guarantee data durability, but it's ok. + // In setup with WAL service, pageserver durability is guaranteed by safekeepers. + // In setup without WAL service, we just don't care. + let flush_lsn = write_lsn; + // `disk_consistent_lsn` is the LSN at which page server guarantees persistence of all received data + // Depending on the setup we recieve WAL directly from Compute Node or + // from a WAL service. + // + // Senders use the feedback to determine if we are caught up: + // - Safekeepers are free to remove WAL preceding `apply_lsn`, + // as it will never be requested by this page server. + // - Compute Node uses 'apply_lsn' to calculate a lag for back pressure mechanism + // (delay WAL inserts to avoid lagging pageserver responses and WAL overflow). + let apply_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); let ts = SystemTime::now(); const NO_REPLY: u8 = 0; physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; diff --git a/vendor/postgres b/vendor/postgres index a70d892bb9..da7459982c 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit a70d892bb93e0a8a6cda8a8fccd4e4fbf408ea79 +Subproject commit da7459982caf933db266109c363b655b5c1be56d diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 9f9e974336..f3486e2885 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -55,9 +55,9 @@ impl HotStandbyFeedback { /// Standby status update #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StandbyReply { - pub write_lsn: Lsn, // disk consistent lSN - pub flush_lsn: Lsn, // LSN committedby quorum - pub apply_lsn: Lsn, // not used + pub write_lsn: Lsn, // not used + pub flush_lsn: Lsn, // not used + pub apply_lsn: Lsn, // pageserver's disk consistent lSN pub reply_ts: TimestampTz, pub reply_requested: bool, } @@ -115,7 +115,7 @@ impl ReplicationConn { Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { let reply = StandbyReply::des(&m[1..]) .context("failed to deserialize StandbyReply")?; - state.disk_consistent_lsn = reply.write_lsn; + state.disk_consistent_lsn = reply.apply_lsn; timeline.update_replica_state(replica, Some(state)); } _ => warn!("unexpected message {:?}", msg), From b7685eb6bae3994ae8d6066df66bd6fe3e48d7e3 Mon Sep 17 00:00:00 2001 From: anastasia Date: Thu, 11 Nov 2021 16:21:05 +0300 Subject: [PATCH 07/12] Enable backpressure --- control_plane/src/compute.rs | 6 +++++- vendor/postgres | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 282e94aed5..af919ea05f 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -289,8 +289,12 @@ impl PostgresNode { conf.append("shared_buffers", "1MB"); conf.append("fsync", "off"); conf.append("max_connections", "100"); - conf.append("wal_sender_timeout", "0"); conf.append("wal_level", "replica"); + // wal_sender_timeout is the maximum time to wait for WAL replication. + // It also defines how often the walreciever will send a feedback message to the wal sender. + conf.append("wal_sender_timeout", "5s"); + conf.append("max_replication_flush_lag", "160MB"); + conf.append("max_replication_apply_lag", "1500MB"); conf.append("listen_addresses", &self.address.ip().to_string()); conf.append("port", &self.address.port().to_string()); diff --git a/vendor/postgres b/vendor/postgres index da7459982c..be8bdba074 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit da7459982caf933db266109c363b655b5c1be56d +Subproject commit be8bdba074baf2a4c7f8fb2cc701c2b3fac9342f From 7cec13d1dfdddab50074c197c2b7ef53118383a8 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Tue, 30 Nov 2021 15:36:00 +0300 Subject: [PATCH 08/12] Improve shutdown story for code coverage This patch introduces fixes for several problems affecting LLVM-based code coverage: * Daemonizing parent processes should call _exit() to prevent coverage data file corruption (*.profraw) due to concurrent writes. * Implement proper shutdown handlers in safekeeper. --- Cargo.lock | 3 +- control_plane/src/safekeeper.rs | 12 ++- control_plane/src/storage.rs | 29 +++--- pageserver/Cargo.toml | 2 +- pageserver/src/bin/pageserver.rs | 121 ++++++++++-------------- test_runner/fixtures/zenith_fixtures.py | 6 ++ walkeeper/Cargo.toml | 1 + walkeeper/src/bin/safekeeper.rs | 90 ++++++++++-------- zenith_utils/Cargo.toml | 9 +- zenith_utils/src/http/endpoint.rs | 2 +- zenith_utils/src/lib.rs | 4 + zenith_utils/src/shutdown.rs | 6 ++ zenith_utils/src/signals.rs | 59 ++++++++++++ 13 files changed, 210 insertions(+), 134 deletions(-) create mode 100644 zenith_utils/src/shutdown.rs create mode 100644 zenith_utils/src/signals.rs diff --git a/Cargo.lock b/Cargo.lock index a7b90e18b7..59facab172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1888,7 +1888,6 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1" dependencies = [ - "cc", "libc", "signal-hook-registry", ] @@ -2360,6 +2359,7 @@ dependencies = [ "rust-s3", "serde", "serde_json", + "signal-hook", "tempfile", "tokio", "tokio-stream", @@ -2611,6 +2611,7 @@ dependencies = [ "rustls-split", "serde", "serde_json", + "signal-hook", "tempfile", "thiserror", "tokio", diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index ca8dbf38dd..fcbf840397 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -130,9 +130,8 @@ impl SafekeeperNode { let listen_pg = format!("localhost:{}", self.conf.pg_port); let listen_http = format!("localhost:{}", self.conf.http_port); - let mut cmd: &mut Command = &mut Command::new(self.env.safekeeper_bin()?); - cmd = cmd - .args(&["-D", self.datadir_path().to_str().unwrap()]) + let mut cmd = Command::new(self.env.safekeeper_bin()?); + cmd.args(&["-D", self.datadir_path().to_str().unwrap()]) .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) .args(&["--pageserver", &pageserver_conn]) @@ -141,13 +140,18 @@ impl SafekeeperNode { .env_clear() .env("RUST_BACKTRACE", "1"); if !self.conf.sync { - cmd = cmd.arg("--no-sync"); + cmd.arg("--no-sync"); } if self.env.pageserver.auth_type == AuthType::ZenithJWT { cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token); } + let var = "LLVM_PROFILE_FILE"; + if let Some(val) = std::env::var_os(var) { + cmd.env(var, val); + } + if !cmd.status()?.success() { bail!( "Safekeeper failed to start. See '{}' for details.", diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index f4cef0a72b..b8118d7f4b 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -5,7 +5,7 @@ use std::process::Command; use std::time::Duration; use std::{io, result, thread}; -use anyhow::{anyhow, bail}; +use anyhow::bail; use nix::errno::Errno; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; @@ -97,7 +97,6 @@ impl PageServerNode { } pub fn init(&self, create_tenant: Option<&str>) -> anyhow::Result<()> { - let mut cmd = Command::new(self.env.pageserver_bin()?); let listen_pg = format!("localhost:{}", self.env.pageserver.pg_port); let listen_http = format!("localhost:{}", self.env.pageserver.http_port); let mut args = vec![ @@ -122,18 +121,19 @@ impl PageServerNode { args.extend(&["--create-tenant", tenantid]) } - let status = cmd - .args(args) - .env_clear() - .env("RUST_BACKTRACE", "1") - .status() - .expect("pageserver init failed"); + let mut cmd = Command::new(self.env.pageserver_bin()?); + cmd.args(args).env_clear().env("RUST_BACKTRACE", "1"); - if status.success() { - Ok(()) - } else { - Err(anyhow!("pageserver init failed")) + let var = "LLVM_PROFILE_FILE"; + if let Some(val) = std::env::var_os(var) { + cmd.env(var, val); } + + if !cmd.status()?.success() { + bail!("pageserver init failed"); + } + + Ok(()) } pub fn repo_path(&self) -> PathBuf { @@ -158,6 +158,11 @@ impl PageServerNode { .env_clear() .env("RUST_BACKTRACE", "1"); + let var = "LLVM_PROFILE_FILE"; + if let Some(val) = std::env::var_os(var) { + cmd.env(var, val); + } + if !cmd.status()?.success() { bail!( "Pageserver failed to start. See '{}' for details.", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 6cb26404c5..7bb6d1c945 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -35,7 +35,7 @@ scopeguard = "1.1.0" async-trait = "0.1" const_format = "0.2.21" tracing = "0.1.27" -signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } +signal-hook = "0.3.10" url = "2" nix = "0.23" once_cell = "1.8.0" diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 63de235003..e237afbd16 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -14,14 +14,6 @@ use tracing::*; use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType, tcp_listener, GIT_VERSION}; use anyhow::{bail, ensure, Context, Result}; -use signal_hook::consts::signal::*; -use signal_hook::consts::TERM_SIGNALS; -use signal_hook::flag; -use signal_hook::iterator::exfiltrator::WithOrigin; -use signal_hook::iterator::SignalsInfo; -use std::process::exit; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; use clap::{App, Arg, ArgMatches}; use daemonize::Daemonize; @@ -32,6 +24,8 @@ use pageserver::{ }; use zenith_utils::http::endpoint; use zenith_utils::postgres_backend; +use zenith_utils::shutdown::exit_now; +use zenith_utils::signals::{self, Signal}; use const_format::formatcp; @@ -524,17 +518,6 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { info!("version: {}", GIT_VERSION); - let term_now = Arc::new(AtomicBool::new(false)); - for sig in TERM_SIGNALS { - // When terminated by a second term signal, exit with exit code 1. - // This will do nothing the first time (because term_now is false). - flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?; - // But this will "arm" the above for the second time, by setting it to true. - // The order of registering these is important, if you put this one first, it will - // first arm and then terminate ‒ all in the first round. - flag::register(*sig, Arc::clone(&term_now))?; - } - // TODO: Check that it looks like a valid repository before going further // bind sockets before daemonizing so we report errors early and do not return until we are listening @@ -550,6 +533,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { ); let pageserver_listener = tcp_listener::bind(conf.listen_pg_addr.clone())?; + // XXX: Don't spawn any threads before daemonizing! if conf.daemonize { info!("daemonizing..."); @@ -564,18 +548,21 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { .stdout(stdout) .stderr(stderr); - match daemonize.start() { + // XXX: The parent process should exit abruptly right after + // it has spawned a child to prevent coverage machinery from + // dumping stats into a `profraw` file now owned by the child. + // Otherwise, the coverage data will be damaged. + match daemonize.exit_action(|| exit_now(0)).start() { Ok(_) => info!("Success, daemonized"), Err(err) => error!(%err, "could not daemonize"), } } - // keep join handles for spawned threads - // don't spawn threads before daemonizing - let mut join_handles = Vec::new(); + let signals = signals::install_shutdown_handlers()?; + let mut threads = vec![]; if let Some(handle) = remote_storage::run_storage_sync_thread(conf)? { - join_handles.push(handle); + threads.push(handle); } // Initialize tenant manager. tenant_mgr::init(conf); @@ -594,61 +581,55 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { // Spawn a new thread for the http endpoint // bind before launching separate thread so the error reported before startup exits let cloned = auth.clone(); - let http_endpoint_thread = thread::Builder::new() - .name("http_endpoint_thread".into()) - .spawn(move || { - let router = http::make_router(conf, cloned); - endpoint::serve_thread_main(router, http_listener) - })?; - - join_handles.push(http_endpoint_thread); + threads.push( + thread::Builder::new() + .name("http_endpoint_thread".into()) + .spawn(move || { + let router = http::make_router(conf, cloned); + endpoint::serve_thread_main(router, http_listener) + })?, + ); // Spawn a thread to listen for connections. It will spawn further threads // for each connection. - let page_service_thread = thread::Builder::new() - .name("Page Service thread".into()) - .spawn(move || { - page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) - })?; + threads.push( + thread::Builder::new() + .name("Page Service thread".into()) + .spawn(move || { + page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) + })?, + ); - for info in SignalsInfo::::new(TERM_SIGNALS)?.into_iter() { - match info.signal { - SIGQUIT => { - info!("Got SIGQUIT. Terminate pageserver in immediate shutdown mode"); - exit(111); - } - SIGINT | SIGTERM => { - info!("Got SIGINT/SIGTERM. Terminate gracefully in fast shutdown mode"); - // Terminate postgres backends - postgres_backend::set_pgbackend_shutdown_requested(); - // Stop all tenants and flush their data - tenant_mgr::shutdown_all_tenants()?; - // Wait for pageservice thread to complete the job - page_service_thread + signals.handle(|signal| match signal { + Signal::Quit => { + info!( + "Got {}. Terminating in immediate shutdown mode", + signal.name() + ); + std::process::exit(111); + } + + Signal::Interrupt | Signal::Terminate => { + info!( + "Got {}. Terminating gracefully in fast shutdown mode", + signal.name() + ); + + postgres_backend::set_pgbackend_shutdown_requested(); + tenant_mgr::shutdown_all_tenants()?; + endpoint::shutdown(); + + for handle in std::mem::take(&mut threads) { + handle .join() .expect("thread panicked") .expect("thread exited with an error"); - - // Shut down http router - endpoint::shutdown(); - - // Wait for all threads - for handle in join_handles.into_iter() { - handle - .join() - .expect("thread panicked") - .expect("thread exited with an error"); - } - info!("Pageserver shut down successfully completed"); - exit(0); - } - unknown_signal => { - debug!("Unknown signal {}", unknown_signal); } + + info!("Shut down successfully completed"); + std::process::exit(0); } - } - - Ok(()) + }) } #[cfg(test)] diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 326cb060d1..954e9d75ff 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -509,6 +509,12 @@ sync = false # Disable fsyncs to make the tests go faster env_vars['ZENITH_REPO_DIR'] = str(self.repo_dir) env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir) + # Pass coverage settings + var = 'LLVM_PROFILE_FILE' + val = os.environ.get(var) + if val: + env_vars[var] = val + # Intercept CalledProcessError and print more info try: res = subprocess.run(args, diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index 539a925ebd..69d5f681c5 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -28,6 +28,7 @@ anyhow = "1.0" crc32c = "0.6.0" humantime = "2.1.0" walkdir = "2" +signal-hook = "0.3.10" serde = { version = "1.0", features = ["derive"] } hex = "0.4.3" const_format = "0.2.21" diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index e85d49a8c6..0f06983574 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -8,14 +8,15 @@ use daemonize::Daemonize; use log::*; use std::path::{Path, PathBuf}; use std::thread; -use zenith_utils::http::endpoint; -use zenith_utils::{logging, tcp_listener, GIT_VERSION}; - use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; use walkeeper::http; use walkeeper::s3_offload; use walkeeper::wal_service; use walkeeper::SafeKeeperConf; +use zenith_utils::http::endpoint; +use zenith_utils::shutdown::exit_now; +use zenith_utils::signals; +use zenith_utils::{logging, tcp_listener, GIT_VERSION}; fn main() -> Result<()> { zenith_metrics::set_common_metrics_prefix("safekeeper"); @@ -131,6 +132,7 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { e })?; + // XXX: Don't spawn any threads before daemonizing! if conf.daemonize { info!("daemonizing..."); @@ -145,51 +147,59 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { .stdout(stdout) .stderr(stderr); - match daemonize.start() { + // XXX: The parent process should exit abruptly right after + // it has spawned a child to prevent coverage machinery from + // dumping stats into a `profraw` file now owned by the child. + // Otherwise, the coverage data will be damaged. + match daemonize.exit_action(|| exit_now(0)).start() { Ok(_) => info!("Success, daemonized"), Err(e) => error!("Error, {}", e), } } - let mut threads = Vec::new(); + let signals = signals::install_shutdown_handlers()?; + let mut threads = vec![]; - let conf_cloned = conf.clone(); - let http_endpoint_thread = thread::Builder::new() - .name("http_endpoint_thread".into()) - .spawn(|| { - // TODO authentication - let router = http::make_router(conf_cloned); - endpoint::serve_thread_main(router, http_listener).unwrap(); - }) - .unwrap(); - threads.push(http_endpoint_thread); + let conf_ = conf.clone(); + threads.push( + thread::Builder::new() + .name("http_endpoint_thread".into()) + .spawn(|| { + // TODO authentication + let router = http::make_router(conf_); + endpoint::serve_thread_main(router, http_listener).unwrap(); + })?, + ); if conf.ttl.is_some() { - let s3_conf = conf.clone(); - let s3_offload_thread = thread::Builder::new() - .name("S3 offload thread".into()) + let conf_ = conf.clone(); + threads.push( + thread::Builder::new() + .name("S3 offload thread".into()) + .spawn(|| { + s3_offload::thread_main(conf_); + })?, + ); + } + + threads.push( + thread::Builder::new() + .name("WAL acceptor thread".into()) .spawn(|| { - // thread code - s3_offload::thread_main(s3_conf); - }) - .unwrap(); - threads.push(s3_offload_thread); - } + let thread_result = wal_service::thread_main(conf, pg_listener); + if let Err(e) = thread_result { + info!("wal_service thread terminated: {}", e); + } + })?, + ); - let wal_acceptor_thread = thread::Builder::new() - .name("WAL acceptor thread".into()) - .spawn(|| { - // thread code - let thread_result = wal_service::thread_main(conf, pg_listener); - if let Err(e) = thread_result { - info!("wal_service thread terminated: {}", e); - } - }) - .unwrap(); - threads.push(wal_acceptor_thread); - - for t in threads { - t.join().unwrap() - } - Ok(()) + // NOTE: we still have to handle signals like SIGQUIT to prevent coredumps + signals.handle(|signal| { + // TODO: implement graceful shutdown with joining threads etc + info!( + "Got {}. Terminating in immediate shutdown mode", + signal.name() + ); + std::process::exit(111); + }) } diff --git a/zenith_utils/Cargo.toml b/zenith_utils/Cargo.toml index 6292971c21..3a81e9bd38 100644 --- a/zenith_utils/Cargo.toml +++ b/zenith_utils/Cargo.toml @@ -20,18 +20,17 @@ tokio = "1.11" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } nix = "0.23.0" - -zenith_metrics = { path = "../zenith_metrics" } -workspace_hack = { path = "../workspace_hack" } +signal-hook = "0.3.10" rand = "0.8.3" jsonwebtoken = "7" hex = { version = "0.4.3", features = ["serde"] } - rustls = "0.19.1" rustls-split = "0.2.1" - git-version = "0.3.5" +zenith_metrics = { path = "../zenith_metrics" } +workspace_hack = { path = "../workspace_hack" } + [dev-dependencies] hex-literal = "0.3" bytes = "1.0" diff --git a/zenith_utils/src/http/endpoint.rs b/zenith_utils/src/http/endpoint.rs index 9c35f77328..ffb798fe83 100644 --- a/zenith_utils/src/http/endpoint.rs +++ b/zenith_utils/src/http/endpoint.rs @@ -153,7 +153,7 @@ pub fn check_permission(req: &Request, tenantid: Option) -> Res } } -// Send shutdown signal +/// Initiate graceful shutdown of the http endpoint pub fn shutdown() { if let Some(tx) = SHUTDOWN_SENDER.lock().unwrap().take() { let _ = tx.send(()); diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index eb9948ed64..b0e5131a11 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -40,6 +40,7 @@ pub mod logging; // Misc pub mod accum; +pub mod shutdown; // Utility for binding TcpListeners with proper socket options. pub mod tcp_listener; @@ -47,6 +48,9 @@ pub mod tcp_listener; // Utility for putting a raw file descriptor into non-blocking mode pub mod nonblock; +// Default signal handling +pub mod signals; + // This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages // // we have several cases: diff --git a/zenith_utils/src/shutdown.rs b/zenith_utils/src/shutdown.rs new file mode 100644 index 0000000000..7eba905997 --- /dev/null +++ b/zenith_utils/src/shutdown.rs @@ -0,0 +1,6 @@ +/// Immediately terminate the calling process without calling +/// atexit callbacks, C runtime destructors etc. We mainly use +/// this to protect coverage data from concurrent writes. +pub fn exit_now(code: u8) { + unsafe { nix::libc::_exit(code as _) }; +} diff --git a/zenith_utils/src/signals.rs b/zenith_utils/src/signals.rs new file mode 100644 index 0000000000..6586da2339 --- /dev/null +++ b/zenith_utils/src/signals.rs @@ -0,0 +1,59 @@ +use signal_hook::flag; +use signal_hook::iterator::Signals; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + +pub use signal_hook::consts::{signal::*, TERM_SIGNALS}; + +pub fn install_shutdown_handlers() -> anyhow::Result { + let term_now = Arc::new(AtomicBool::new(false)); + for sig in TERM_SIGNALS { + // When terminated by a second term signal, exit with exit code 1. + // This will do nothing the first time (because term_now is false). + flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?; + // But this will "arm" the above for the second time, by setting it to true. + // The order of registering these is important, if you put this one first, it will + // first arm and then terminate ‒ all in the first round. + flag::register(*sig, Arc::clone(&term_now))?; + } + + Ok(ShutdownSignals) +} + +pub enum Signal { + Quit, + Interrupt, + Terminate, +} + +impl Signal { + pub fn name(&self) -> &'static str { + match self { + Signal::Quit => "SIGQUIT", + Signal::Interrupt => "SIGINT", + Signal::Terminate => "SIGTERM", + } + } +} + +pub struct ShutdownSignals; + +impl ShutdownSignals { + pub fn handle( + self, + mut handler: impl FnMut(Signal) -> anyhow::Result<()>, + ) -> anyhow::Result<()> { + for raw_signal in Signals::new(TERM_SIGNALS)?.into_iter() { + let signal = match raw_signal { + SIGINT => Signal::Interrupt, + SIGTERM => Signal::Terminate, + SIGQUIT => Signal::Quit, + other => panic!("unknown signal: {}", other), + }; + + handler(signal)?; + } + + Ok(()) + } +} From 5d37560308912d2dd41c6ed5672e61ae149a8369 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Mon, 6 Dec 2021 03:40:44 +0300 Subject: [PATCH 09/12] Add bespoke glue script leveraging LLVM coverage tools --- .gitignore | 4 + scripts/coverage | 510 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 514 insertions(+) create mode 100755 scripts/coverage diff --git a/.gitignore b/.gitignore index afcc79fd20..2ecdaa2053 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,7 @@ test_output/ .vscode /.zenith /integration_tests/.zenith + +# Coverage +*.profraw +*.profdata diff --git a/scripts/coverage b/scripts/coverage new file mode 100755 index 0000000000..9ad282a7b2 --- /dev/null +++ b/scripts/coverage @@ -0,0 +1,510 @@ +#!/usr/bin/env python3 + +# Here'a good link in case you're interested in learning more +# about current deficiencies of rust code coverage story: +# https://github.com/rust-lang/rust/issues?q=is%3Aissue+is%3Aopen+instrument-coverage+label%3AA-code-coverage +# +# Also a couple of inspirational tools which I deliberately ended up not using: +# * https://github.com/mozilla/grcov +# * https://github.com/taiki-e/cargo-llvm-cov +# * https://github.com/llvm/llvm-project/tree/main/llvm/test/tools/llvm-cov + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from pathlib import Path +from tempfile import TemporaryDirectory +from textwrap import dedent +from typing import Any, Iterable, List, Optional + +import argparse +import json +import os +import shutil +import subprocess +import sys + + +def intersperse(sep: Any, iterable: Iterable[Any]): + fst = True + for item in iterable: + if not fst: + yield sep + fst = False + yield item + + +def find_demangler(demangler=None): + known_tools = ['c++filt', 'rustfilt', 'llvm-cxxfilt'] + + if demangler: + # Explicit argument has precedence over `known_tools` + demanglers = [demangler] + else: + demanglers = known_tools + + for demangler in demanglers: + if shutil.which(demangler): + return demangler + + raise Exception(' '.join([ + 'Failed to find symbol demangler.', + 'Please install it or provide another tool', + f"(e.g. {', '.join(known_tools)})", + ])) + + +class Cargo: + def __init__(self, cwd: Path): + self.cwd = cwd + self.target_dir = Path(os.environ.get('CARGO_TARGET_DIR', cwd / 'target')).resolve() + self._rustlib_dir = None + + @property + def rustlib_dir(self): + if not self._rustlib_dir: + cmd = [ + 'cargo', + '-Zunstable-options', + 'rustc', + '--print=target-libdir', + ] + self._rustlib_dir = Path(subprocess.check_output(cmd, cwd=self.cwd, text=True)).parent + + return self._rustlib_dir + + def binaries(self, profile: str) -> List[str]: + executables = [] + + # This will emit json messages containing test binaries names + cmd = [ + 'cargo', + 'test', + '--no-run', + '--message-format=json', + ] + env = dict(os.environ, PROFILE=profile) + output = subprocess.check_output(cmd, cwd=self.cwd, env=env, text=True) + + for line in output.splitlines(keepends=False): + meta = json.loads(line) + exe = meta.get('executable') + if exe: + executables.append(exe) + + # Metadata contains crate names, which can be used + # to recover names of executables, e.g. `pageserver` + cmd = [ + 'cargo', + 'metadata', + '--format-version=1', + '--no-deps', + ] + meta = json.loads(subprocess.check_output(cmd, cwd=self.cwd)) + + for pkg in meta.get('packages', []): + for target in pkg.get('targets', []): + if 'bin' in target['kind']: + exe = self.target_dir / profile / target['name'] + if exe.exists(): + executables.append(str(exe)) + + return executables + + +@dataclass +class LLVM: + cargo: Cargo + + def resolve_tool(self, name: str) -> str: + exe = self.cargo.rustlib_dir / 'bin' / name + if exe.exists(): + return str(exe) + + if not shutil.which(name): + # Show a user-friendly warning + raise Exception(' '.join([ + f"It appears that you don't have `{name}` installed.", + "Please execute `rustup component add llvm-tools-preview`,", + "or install it via your package manager of choice.", + "LLVM tools should be the same version as LLVM in `rustc --version --verbose`.", + ])) + + return name + + def profdata(self, input_dir: Path, output_profdata: Path): + profraws = [f for f in input_dir.iterdir() if f.suffix == '.profraw'] + if not profraws: + raise Exception(f'No profraw files found at {input_dir}') + + with open(input_dir / 'profraw.list', 'w') as input_files: + profraw_mtime = 0 + for profraw in profraws: + profraw_mtime = max(profraw_mtime, profraw.stat().st_mtime_ns) + print(profraw, file=input_files) + input_files.flush() + + try: + profdata_mtime = output_profdata.stat().st_mtime_ns + except FileNotFoundError: + profdata_mtime = 0 + + # An obvious make-ish optimization + if profraw_mtime >= profdata_mtime: + subprocess.check_call([ + self.resolve_tool('llvm-profdata'), + 'merge', + '-sparse', + f'-input-files={input_files.name}', + f'-output={output_profdata}', + ]) + + def _cov(self, + *extras, + subcommand: str, + profdata: Path, + objects: List[str], + sources: List[str], + demangler: Optional[str] = None) -> None: + + cwd = self.cargo.cwd + objects = list(intersperse('-object', objects)) + extras = list(extras) + + # For some reason `rustc` produces relative paths to src files, + # so we force it to cut the $PWD prefix. + # see: https://github.com/rust-lang/rust/issues/34701#issuecomment-739809584 + if sources: + extras.append(f'-path-equivalence=.,{cwd.resolve()}') + + if demangler: + extras.append(f'-Xdemangler={demangler}') + + cmd = [ + self.resolve_tool('llvm-cov'), + subcommand, # '-dump-collected-paths', # classified debug flag + '-instr-profile', + str(profdata), + *extras, + *objects, + *sources, + ] + subprocess.check_call(cmd, cwd=cwd) + + def cov_report(self, **kwargs) -> None: + self._cov(subcommand='report', **kwargs) + + def cov_export(self, *, kind: str, **kwargs) -> None: + extras = [f'-format={kind}'] + self._cov(subcommand='export', *extras, **kwargs) + + def cov_show(self, *, kind: str, output_dir: Optional[Path] = None, **kwargs) -> None: + extras = [f'-format={kind}'] + if output_dir: + extras.append(f'-output-dir={output_dir}') + + self._cov(subcommand='show', *extras, **kwargs) + + +@dataclass +class Report(ABC): + """ Common properties of a coverage report """ + + llvm: LLVM + demangler: str + profdata: Path + objects: List[str] + sources: List[str] + + def _common_kwargs(self): + return dict(profdata=self.profdata, + objects=self.objects, + sources=self.sources, + demangler=self.demangler) + + @abstractmethod + def generate(self): + pass + + def open(self): + # Do nothing by default + pass + + +class SummaryReport(Report): + def generate(self): + self.llvm.cov_report(**self._common_kwargs()) + + +class TextReport(Report): + def generate(self): + self.llvm.cov_show(kind='text', **self._common_kwargs()) + + +class LcovReport(Report): + def generate(self): + self.llvm.cov_export(kind='lcov', **self._common_kwargs()) + + +@dataclass +class HtmlReport(Report): + output_dir: Path + + def generate(self): + self.llvm.cov_show(kind='html', output_dir=self.output_dir, **self._common_kwargs()) + print(f'HTML report is located at `{self.output_dir}`') + + def open(self): + tool = dict(linux='xdg-open', darwin='open').get(sys.platform) + if not tool: + raise Exception(f'Unknown platform {sys.platform}') + + subprocess.check_call([tool, self.output_dir / 'index.html'], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + + +@dataclass +class GithubPagesReport(HtmlReport): + output_dir: Path + commit_url: str + + def generate(self): + def index_path(path): + return path / 'index.html' + + common = self._common_kwargs() + # Provide default sources if there's none + common.setdefault('sources', ['.']) + + self.llvm.cov_show(kind='html', output_dir=self.output_dir, **common) + shutil.copy(index_path(self.output_dir), self.output_dir / 'local.html') + + with TemporaryDirectory() as tmp: + output_dir = Path(tmp) + args = dict(common, sources=[]) + self.llvm.cov_show(kind='html', output_dir=output_dir, **args) + shutil.copy(index_path(output_dir), self.output_dir / 'all.html') + + with open(index_path(self.output_dir), 'w') as index: + commit_sha = self.commit_url.rsplit('/', maxsplit=1)[-1][:10] + + html = f""" + + + + Coverage ({commit_sha}) + + +

+ Coverage report for commit + + {commit_sha} + +

+ +

+ + Show only local sources + +

+ +

+ + Show all sources (including dependencies) + +

+ + + """ + index.write(dedent(html)) + + print(f'HTML report is located at `{self.output_dir}`') + + +class State: + def __init__(self, cwd: Path, top_dir: Optional[Path], profraw_prefix: Optional[str]): + # Use hostname by default + profraw_prefix = profraw_prefix or '%h' + + self.cwd = cwd + self.cargo = Cargo(self.cwd) + self.llvm = LLVM(self.cargo) + + self.top_dir = top_dir or self.cargo.target_dir / 'coverage' + self.report_dir = self.top_dir / 'report' + + # Directory for raw coverage data emitted by executables + self.profraw_dir = self.top_dir / 'profraw' + self.profraw_dir.mkdir(parents=True, exist_ok=True) + + # Aggregated coverage data + self.profdata_file = self.top_dir / 'coverage.profdata' + + # Dump all coverage data files into a dedicated directory. + # Each filename is parameterized by PID & executable's signature. + os.environ['LLVM_PROFILE_FILE'] = str(self.profraw_dir / + f'cov-{profraw_prefix}-%p-%m.profraw') + + os.environ['RUSTFLAGS'] = ' '.join([ + os.environ.get('RUSTFLAGS', ''), + # Enable LLVM's source-based coverage + # see: https://clang.llvm.org/docs/SourceBasedCodeCoverage.html + # see: https://blog.rust-lang.org/inside-rust/2020/11/12/source-based-code-coverage.html + '-Zinstrument-coverage', + # Link every bit of code to prevent "holes" in coverage report + # see: https://doc.rust-lang.org/rustc/codegen-options/index.html#link-dead-code + '-Clink-dead-code', + # Some of the paths that `rustc` embeds into binaries are absolute, others are relative. + # The point is, we can't have both, because depending on `-path-equivalence`, `llvm-cov` + # either will cripple absolute paths or won't be able to show relative paths at all. + # There's no way to turn relative paths into absolute, so we strip $PWD prefix. + # Only source files of deps (e.g. `$HOME/.cargo`) will keep their absolute paths, + # but we won't include them in report by default (but see `--all`). + f'--remap-path-prefix {self.cwd}=', + ]) + + # XXX: God, have mercy on our souls... + # see: https://github.com/rust-lang/rust/pull/90132 + os.environ['RUSTC_BOOTSTRAP'] = '1' + + def do_run(self, args): + subprocess.check_call([*args.command, *args.args]) + + def do_report(self, args): + if args.all and args.sources: + raise Exception('--all should not be used with sources') + + # see man for `llvm-cov show [sources]` + if args.all: + sources = [] + elif not args.sources: + sources = ['.'] + else: + sources = args.sources + + print('* Merging profraw files') + self.llvm.profdata(self.profraw_dir, self.profdata_file) + + objects = [] + if args.input_objects: + print('* Collecting object files using --input-objects') + with open(args.input_objects) as f: + objects.extend(f.read().splitlines(keepends=False)) + if args.cargo_objects == 'true' or (args.cargo_objects == 'auto' + and not args.input_objects): + print('* Collecting object files using cargo') + objects.extend(self.cargo.binaries(args.profile)) + + params = dict(llvm=self.llvm, + demangler=find_demangler(args.demangler), + profdata=self.profdata_file, + objects=objects, + sources=sources) + + formats = { + 'html': + lambda: HtmlReport(**params, output_dir=self.report_dir), + 'text': + lambda: TextReport(**params), + 'lcov': + lambda: LcovReport(**params), + 'summary': + lambda: SummaryReport(**params), + 'github': + lambda: GithubPagesReport( + **params, output_dir=self.report_dir, commit_url=args.commit_url), + } + + report = formats.get(args.format)() + if not report: + raise Exception('Format `{args.format}` is not supported') + + print(f'* Rendering coverage report ({args.format})') + report.generate() + + if args.open: + print('* Opening the report') + report.open() + + def do_clean(self, args): + # Wipe everything if no filters have been provided + if not (args.report or args.prof): + shutil.rmtree(self.top_dir, ignore_errors=True) + else: + if args.report: + shutil.rmtree(self.report_dir, ignore_errors=True) + if args.prof: + self.profdata_file.unlink(missing_ok=True) + + +def main(): + app = sys.argv[0] + example = f""" +prerequisites: + # alternatively, install a system package for `llvm-tools` + rustup component add llvm-tools-preview + +self-contained example: + {app} run make + {app} run pipenv run pytest test_runner + {app} run cargo test + {app} report --open + """ + + parser = argparse.ArgumentParser(description='Coverage report builder', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=example) + parser.add_argument('--dir', type=Path, help='output directory') + parser.add_argument('--profraw-prefix', metavar='STRING', type=str) + + commands = parser.add_subparsers(title='commands', dest='subparser_name') + + p_run = commands.add_parser('run', help='run a command with magic env') + p_run.add_argument('command', nargs=1) + p_run.add_argument('args', nargs=argparse.REMAINDER) + + p_report = commands.add_parser('report', help='generate a coverage report') + p_report.add_argument('--profile', + default='debug', + choices=('debug', 'release'), + help='cargo build profile') + p_report.add_argument('--format', + default='html', + choices=('html', 'text', 'summary', 'lcov', 'github'), + help='report format') + p_report.add_argument('--input-objects', + metavar='FILE', + type=Path, + help='file containing list of binaries') + p_report.add_argument('--cargo-objects', + default='auto', + choices=('auto', 'true', 'false'), + help='use cargo for auto discovery of binaries') + p_report.add_argument('--commit-url', type=str, help='required for --format=github') + p_report.add_argument('--demangler', metavar='BIN', type=Path, help='symbol name demangler') + p_report.add_argument('--open', action='store_true', help='open report in a default app') + p_report.add_argument('--all', action='store_true', help='show everything, e.g. deps') + p_report.add_argument('sources', nargs='*', type=Path, help='source file or directory') + + p_clean = commands.add_parser('clean', help='wipe coverage artifacts') + p_clean.add_argument('--report', action='store_true', help='pick generated report') + p_clean.add_argument('--prof', action='store_true', help='pick *.profdata & *.profraw') + + args = parser.parse_args() + state = State(cwd=Path.cwd(), top_dir=args.dir, profraw_prefix=args.profraw_prefix) + + commands = { + 'run': state.do_run, + 'report': state.do_report, + 'clean': state.do_clean, + } + + action = commands.get(args.subparser_name) + if action: + action(args) + else: + parser.print_help() + + +if __name__ == '__main__': + main() From d8746759554764224b01ef3d150d8367471841c7 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Mon, 6 Dec 2021 03:41:16 +0300 Subject: [PATCH 10/12] Collect coverage in CI --- .circleci/config.yml | 203 +++++++++++++++++++++++++++++++++---------- scripts/git-upload | 136 +++++++++++++++++++++++++++++ 2 files changed, 295 insertions(+), 44 deletions(-) create mode 100755 scripts/git-upload diff --git a/.circleci/config.yml b/.circleci/config.yml index 45c0f0df57..b5aa426d0b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -14,7 +14,6 @@ jobs: executor: zenith-build-executor steps: - checkout - - run: name: rustfmt when: always @@ -81,6 +80,8 @@ jobs: build_type: type: enum enum: ["debug", "release"] + environment: + BUILD_TYPE: << parameters.build_type >> steps: - run: name: apt install dependencies @@ -116,16 +117,17 @@ jobs: - run: name: Rust build << parameters.build_type >> command: | - export CARGO_INCREMENTAL=0 - BUILD_TYPE="<< parameters.build_type >>" if [[ $BUILD_TYPE == "debug" ]]; then - echo "Build in debug mode" - cargo build --bins --tests + cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) + CARGO_FLAGS= elif [[ $BUILD_TYPE == "release" ]]; then - echo "Build in release mode" - cargo build --release --bins --tests + cov_prefix=() + CARGO_FLAGS=--release fi + export CARGO_INCREMENTAL=0 + "${cov_prefix[@]}" cargo build $CARGO_FLAGS --bins --tests + - save_cache: name: Save rust cache key: v04-rust-cache-deps-<< parameters.build_type >>-{{ checksum "Cargo.lock" }} @@ -138,45 +140,77 @@ jobs: # has to run separately from cargo fmt section # since needs to run with dependencies - run: - name: clippy + name: cargo clippy command: | - ./run_clippy.sh + if [[ $BUILD_TYPE == "debug" ]]; then + cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) + elif [[ $BUILD_TYPE == "release" ]]; then + cov_prefix=() + fi + + "${cov_prefix[@]}" ./run_clippy.sh # Run rust unit tests - - run: cargo test + - run: + name: cargo test + command: | + if [[ $BUILD_TYPE == "debug" ]]; then + cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) + elif [[ $BUILD_TYPE == "release" ]]; then + cov_prefix=() + fi + + "${cov_prefix[@]}" cargo test # Install the rust binaries, for use by test jobs - # `--locked` is required; otherwise, `cargo install` will ignore Cargo.lock. - # FIXME: this is a really silly way to install; maybe we should just output - # a tarball as an artifact? Or a .deb package? - run: - name: cargo install + name: Install rust binaries command: | - export CARGO_INCREMENTAL=0 - BUILD_TYPE="<< parameters.build_type >>" if [[ $BUILD_TYPE == "debug" ]]; then - echo "Install debug mode" - CARGO_FLAGS="--debug" + cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) elif [[ $BUILD_TYPE == "release" ]]; then - echo "Install release mode" - # The default is release mode; there is no --release flag. - CARGO_FLAGS="" + cov_prefix=() + fi + + binaries=$( + "${cov_prefix[@]}" cargo metadata --format-version=1 --no-deps | + jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name' + ) + + test_exe_paths=$( + "${cov_prefix[@]}" cargo test --message-format=json --no-run | + jq -r '.executable | select(. != null)' + ) + + mkdir -p /tmp/zenith/bin + mkdir -p /tmp/zenith/test_bin + mkdir -p /tmp/zenith/etc + + # Install target binaries + for bin in $binaries; do + SRC=target/$BUILD_TYPE/$bin + DST=/tmp/zenith/bin/$bin + cp $SRC $DST + echo $DST >> /tmp/zenith/etc/binaries.list + done + + # Install test executables (for code coverage) + if [[ $BUILD_TYPE == "debug" ]]; then + for bin in $test_exe_paths; do + SRC=$bin + DST=/tmp/zenith/test_bin/$(basename $bin) + cp $SRC $DST + echo $DST >> /tmp/zenith/etc/binaries.list + done fi - cargo install $CARGO_FLAGS --locked --root /tmp/zenith --path pageserver - cargo install $CARGO_FLAGS --locked --root /tmp/zenith --path walkeeper - cargo install $CARGO_FLAGS --locked --root /tmp/zenith --path zenith # Install the postgres binaries, for use by test jobs - # FIXME: this is a silly way to do "install"; maybe just output a standard - # postgres package, whatever the favored form is (tarball? .deb package?) - # Note that pg_regress needs some build artifacts that probably aren't - # in the usual package...? - run: - name: postgres install + name: Install postgres binaries command: | cp -a tmp_install /tmp/zenith/pg_install - # Save the rust output binaries for other jobs in this workflow. + # Save the rust binaries and coverage data for other jobs in this workflow. - persist_to_workspace: root: /tmp/zenith paths: @@ -228,6 +262,8 @@ jobs: save_perf_report: type: boolean default: false + environment: + BUILD_TYPE: << parameters.build_type >> steps: - attach_workspace: at: /tmp/zenith @@ -241,21 +277,22 @@ jobs: command: pipenv --python 3.7 install - run: name: Run pytest - working_directory: test_runner # pytest doesn't output test logs in real time, so CI job may fail with # `Too long with no output` error, if a test is running for a long time. - # In that case, tests should have internal timeouts that are less than + # In that case, tests should have internal timeouts that are less than # no_output_timeout, specified here. no_output_timeout: 10m environment: - ZENITH_BIN: /tmp/zenith/bin - POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install - TEST_OUTPUT: /tmp/test_output - # this variable will be embedded in perf test report + # this variable will be embedded in perf test report # and is needed to distinguish different environments - PLATFORM: zenith-local-ci command: | - TEST_SELECTION="<< parameters.test_selection >>" + PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)" + + TEST_SELECTION="test_runner/<< parameters.test_selection >>" EXTRA_PARAMS="<< parameters.extra_params >>" if [ -z "$TEST_SELECTION" ]; then echo "test_selection must be set" @@ -263,16 +300,22 @@ jobs: fi if << parameters.run_in_parallel >>; then EXTRA_PARAMS="-n4 $EXTRA_PARAMS" - fi; + fi if << parameters.save_perf_report >>; then if [[ $CIRCLE_BRANCH == "main" ]]; then - mkdir -p perf-report-local - EXTRA_PARAMS="--out-dir perf-report-local $EXTRA_PARAMS" - fi; - fi; + mkdir -p "$PERF_REPORT_DIR" + EXTRA_PARAMS="--out-dir $PERF_REPORT_DIR $EXTRA_PARAMS" + fi + fi export GITHUB_SHA=$CIRCLE_SHA1 + if [[ $BUILD_TYPE == "debug" ]]; then + cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) + elif [[ $BUILD_TYPE == "release" ]]; then + cov_prefix=() + fi + # Run the tests. # # The junit.xml file allows CircleCI to display more fine-grained test information @@ -283,14 +326,21 @@ jobs: # -n4 uses four processes to run tests via pytest-xdist # -s is not used to prevent pytest from capturing output, because tests are running # in parallel and logs are mixed between different tests - pipenv run pytest --junitxml=$TEST_OUTPUT/junit.xml --tb=short --verbose -m "not remote_cluster" -rA $TEST_SELECTION $EXTRA_PARAMS + "${cov_prefix[@]}" pipenv run pytest \ + --junitxml=$TEST_OUTPUT/junit.xml \ + --tb=short \ + --verbose \ + -m "not remote_cluster" \ + -rA $TEST_SELECTION $EXTRA_PARAMS if << parameters.save_perf_report >>; then if [[ $CIRCLE_BRANCH == "main" ]]; then - REPORT_FROM=$(realpath perf-report-local) REPORT_TO=local ../scripts/generate_and_push_perf_report.sh - fi; - fi; - + # TODO: reuse scripts/git-upload + export REPORT_FROM="$(PERF_REPORT_DIR)" + export REPORT_TO=local + ../scripts/generate_and_push_perf_report.sh + fi + fi - run: # CircleCI artifacts are preserved one file at a time, so skipping # this step isn't a good idea. If you want to extract the @@ -306,6 +356,65 @@ jobs: # The store_test_results step tells CircleCI where to find the junit.xml file. - store_test_results: path: /tmp/test_output + # Save coverage data (if any) + - persist_to_workspace: + root: /tmp/zenith + paths: + - "*" + + coverage-report: + executor: zenith-build-executor + steps: + - attach_workspace: + at: /tmp/zenith + - checkout + - restore_cache: + name: Restore rust cache + keys: + # Require an exact match. While an out of date cache might speed up the build, + # there's no way to clean out old packages, so the cache grows every time something + # changes. + - v04-rust-cache-deps-debug-{{ checksum "Cargo.lock" }} + - run: + name: Install llvm-tools + command: | + # TODO: install a proper symbol demangler, e.g. rustfilt + # TODO: we should embed this into a docker image + rustup component add llvm-tools-preview + - run: + name: Build coverage report + command: | + COMMIT_URL=https://github.com/zenithdb/zenith/commit/$CIRCLE_SHA1 + + scripts/coverage \ + --dir=/tmp/zenith/coverage report \ + --input-objects=/tmp/zenith/etc/binaries.list \ + --commit-url=$COMMIT_URL \ + --format=github + - run: + name: Upload coverage report + command: | + LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME + REPORT_URL=https://zenithdb.github.io/zenith-coverage-data/$CIRCLE_SHA1 + COMMIT_URL=https://github.com/zenithdb/zenith/commit/$CIRCLE_SHA1 + + scripts/git-upload \ + --repo=https://$VIP_VAP_ACCESS_TOKEN@github.com/zenithdb/zenith-coverage-data.git \ + --message="Add code coverage for $COMMIT_URL" \ + copy /tmp/zenith/coverage/report $CIRCLE_SHA1 # COPY FROM TO_RELATIVE + + # Add link to the coverage report to the commit + curl -f -X POST \ + https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \ + -H "Accept: application/vnd.github.v3+json" \ + --user "$CI_ACCESS_TOKEN" \ + --data \ + "{ + \"state\": \"success\", + \"context\": \"zenith-coverage\", + \"description\": \"Coverage report is ready\", + \"target_url\": \"$REPORT_URL\" + }" # Build zenithdb/zenith:latest image and push it to Docker hub docker-image: @@ -410,6 +519,12 @@ workflows: save_perf_report: true requires: - build-zenith-release + - coverage-report: + # Context passes credentials for gh api + context: CI_ACCESS_TOKEN + requires: + # TODO: consider adding more + - other-tests-debug - docker-image: # Context gives an ability to login context: Docker Hub diff --git a/scripts/git-upload b/scripts/git-upload new file mode 100755 index 0000000000..5298b693af --- /dev/null +++ b/scripts/git-upload @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 + +from contextlib import contextmanager +from tempfile import TemporaryDirectory +from pathlib import Path + +import argparse +import os +import shutil +import subprocess +import sys + + +def absolute_path(path): + return Path(path).resolve() + + +def relative_path(path): + path = Path(path) + if path.is_absolute(): + raise Exception(f'path `{path}` must be relative!') + return path + + +@contextmanager +def chdir(cwd: Path): + old = os.getcwd() + os.chdir(cwd) + try: + yield cwd + finally: + os.chdir(old) + + +def run(cmd, *args, **kwargs): + print('$', ' '.join(cmd)) + subprocess.check_call(cmd, *args, **kwargs) + + +class GitRepo: + def __init__(self, url): + self.url = url + self.cwd = TemporaryDirectory() + + subprocess.check_call([ + 'git', + 'clone', + str(url), + self.cwd.name, + ]) + + def is_dirty(self): + res = subprocess.check_output(['git', 'status', '--porcelain'], text=True).strip() + return bool(res) + + def update(self, message, action, branch=None): + with chdir(self.cwd.name): + if not branch: + cmd = ['git', 'branch', '--show-current'] + branch = subprocess.check_output(cmd, text=True).strip() + + # Run action in repo's directory + action() + + run(['git', 'add', '.']) + + if not self.is_dirty(): + print('No changes detected, quitting') + return + + run([ + 'git', + '-c', + 'user.name=vipvap', + '-c', + 'user.email=vipvap@zenith.tech', + 'commit', + '--author="vipvap "', + f'--message={message}', + ]) + + for _ in range(5): + try: + run(['git', 'fetch', 'origin', branch]) + run(['git', 'rebase', f'origin/{branch}']) + run(['git', 'push', 'origin', branch]) + return + + except subprocess.CalledProcessError as e: + print(f'failed to update branch `{branch}`: {e}', file=sys.stderr) + + raise Exception(f'failed to update branch `{branch}`') + + +def do_copy(args): + src = args.src + dst = args.dst + + try: + if src.is_dir(): + shutil.copytree(src, dst) + else: + shutil.copy(src, dst) + except FileExistsError: + if args.forbid_overwrite: + raise + + +def main(): + parser = argparse.ArgumentParser(description='Git upload tool') + parser.add_argument('--repo', type=str, metavar='URL', required=True, help='git repo url') + parser.add_argument('--message', type=str, metavar='TEXT', help='commit message') + + commands = parser.add_subparsers(title='commands', dest='subparser_name') + + p_copy = commands.add_parser('copy', help='copy file into the repo') + p_copy.add_argument('src', type=absolute_path, help='source path') + p_copy.add_argument('dst', type=relative_path, help='relative dest path') + p_copy.add_argument('--forbid-overwrite', action='store_true', help='do not allow overwrites') + + args = parser.parse_args() + + commands = { + 'copy': do_copy, + } + + action = commands.get(args.subparser_name) + if action: + message = args.message or 'update' + GitRepo(args.repo).update(message, lambda: action(args)) + else: + parser.print_usage() + + +if __name__ == '__main__': + main() From b87ab17d05b2ca9776ee781fca7606aa5f618a81 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Mon, 6 Dec 2021 03:41:40 +0300 Subject: [PATCH 11/12] Bump rust version to 1.56.1 Apparently, code coverage doesn't work that well in 1.55. --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index b5aa426d0b..664db960dd 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,7 +4,7 @@ executors: zenith-build-executor: resource_class: xlarge docker: - - image: cimg/rust:1.55.0 + - image: cimg/rust:1.56.1 zenith-python-executor: docker: - image: cimg/python:3.7.10 # Oldest available 3.7 with Ubuntu 20.04 (for GLIBC and Rust) at CirlceCI From 0a8c67263058d8040668e8bb2e0ac1f3deedc752 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Mon, 6 Dec 2021 13:52:28 +0300 Subject: [PATCH 12/12] [CI] Fix benchmarks Too bad we don't have a --dry-run in PRs :( --- .circleci/config.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 664db960dd..63816eee56 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -336,9 +336,9 @@ jobs: if << parameters.save_perf_report >>; then if [[ $CIRCLE_BRANCH == "main" ]]; then # TODO: reuse scripts/git-upload - export REPORT_FROM="$(PERF_REPORT_DIR)" + export REPORT_FROM="$PERF_REPORT_DIR" export REPORT_TO=local - ../scripts/generate_and_push_perf_report.sh + scripts/generate_and_push_perf_report.sh fi fi - run: