mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-18 21:00:38 +00:00
Compare commits
4 Commits
ordered-bl
...
vecmap-del
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4fb509aa2 | ||
|
|
48a6bbe4c1 | ||
|
|
9a687eae28 | ||
|
|
0579c8bbb9 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2339,7 +2339,6 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"clap",
|
||||
"const_format",
|
||||
"crc32c",
|
||||
"daemonize",
|
||||
"fs2",
|
||||
@@ -2359,7 +2358,6 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"walkdir",
|
||||
"workspace_hack",
|
||||
"zenith_metrics",
|
||||
"zenith_utils",
|
||||
]
|
||||
|
||||
|
||||
@@ -390,7 +390,7 @@ impl DeltaLayer {
|
||||
assert!(!relsizes.is_empty());
|
||||
}
|
||||
|
||||
let delta_layer = DeltaLayer {
|
||||
let mut delta_layer = DeltaLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
timelineid,
|
||||
tenantid,
|
||||
@@ -404,13 +404,14 @@ impl DeltaLayer {
|
||||
relsizes,
|
||||
}),
|
||||
};
|
||||
let mut inner = delta_layer.inner.lock().unwrap();
|
||||
|
||||
// Write the in-memory btreemaps into a file
|
||||
let path = delta_layer
|
||||
.path()
|
||||
.expect("DeltaLayer is supposed to have a layer path on disk");
|
||||
|
||||
let inner = delta_layer.inner.get_mut().unwrap();
|
||||
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let file = File::create(&path)?;
|
||||
@@ -420,11 +421,9 @@ impl DeltaLayer {
|
||||
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
||||
|
||||
for (blknum, history) in page_versions {
|
||||
for (lsn, page_version) in history.as_slice() {
|
||||
if lsn >= &end_lsn {
|
||||
continue;
|
||||
}
|
||||
inner.page_version_metas.reserve(history.len());
|
||||
|
||||
for (lsn, page_version) in history {
|
||||
let buf = PageVersion::ser(page_version)?;
|
||||
let blob_range = page_version_writer.write_blob(&buf)?;
|
||||
|
||||
@@ -469,8 +468,6 @@ impl DeltaLayer {
|
||||
|
||||
trace!("saved {}", &path.display());
|
||||
|
||||
drop(inner);
|
||||
|
||||
Ok(delta_layer)
|
||||
}
|
||||
|
||||
|
||||
@@ -280,8 +280,8 @@ impl Layer for InMemoryLayer {
|
||||
println!("segsizes {}: {}", k, v);
|
||||
}
|
||||
|
||||
for (blknum, history) in inner.page_versions.ordered_block_iter() {
|
||||
for (lsn, pv) in history.as_slice() {
|
||||
for (blknum, history) in inner.page_versions.ordered_block_iter(None) {
|
||||
for (lsn, pv) in history {
|
||||
println!(
|
||||
"blk {} at {}: {}/{}\n",
|
||||
blknum,
|
||||
@@ -700,7 +700,7 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
drop_lsn,
|
||||
true,
|
||||
inner.page_versions.ordered_block_iter(),
|
||||
inner.page_versions.ordered_block_iter(None),
|
||||
inner.segsizes.clone(),
|
||||
)?;
|
||||
trace!(
|
||||
@@ -714,6 +714,8 @@ impl InMemoryLayer {
|
||||
|
||||
let end_lsn = self.end_lsn.unwrap();
|
||||
|
||||
let mut before_page_versions = inner.page_versions.ordered_block_iter(Some(end_lsn));
|
||||
|
||||
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
|
||||
if self.start_lsn != end_lsn {
|
||||
@@ -728,7 +730,7 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
end_lsn,
|
||||
false,
|
||||
inner.page_versions.ordered_block_iter(),
|
||||
before_page_versions,
|
||||
before_segsizes,
|
||||
)?;
|
||||
frozen_layers.push(Arc::new(delta_layer));
|
||||
@@ -739,10 +741,7 @@ impl InMemoryLayer {
|
||||
end_lsn
|
||||
);
|
||||
} else {
|
||||
for (_blknum, history) in inner.page_versions.ordered_block_iter() {
|
||||
let (lsn, _pv) = history.as_slice().first().unwrap();
|
||||
assert!(lsn >= &end_lsn);
|
||||
}
|
||||
assert!(before_page_versions.next().is_none());
|
||||
}
|
||||
|
||||
drop(inner);
|
||||
|
||||
@@ -24,6 +24,14 @@ impl PageVersions {
|
||||
map.append_or_update_last(lsn, page_version).unwrap()
|
||||
}
|
||||
|
||||
/// Get all [`PageVersion`]s in a block
|
||||
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
|
||||
self.0
|
||||
.get(&blknum)
|
||||
.map(VecMap::as_slice)
|
||||
.unwrap_or(EMPTY_SLICE)
|
||||
}
|
||||
|
||||
/// Get a range of [`PageVersions`] in a block
|
||||
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
|
||||
&self,
|
||||
@@ -65,7 +73,8 @@ impl PageVersions {
|
||||
}
|
||||
|
||||
/// Iterate through block-history pairs in block order.
|
||||
pub fn ordered_block_iter(&self) -> OrderedBlockIter<'_> {
|
||||
/// If a [`cutoff_lsn`] is set, only include history with `lsn < cutoff_lsn`
|
||||
pub fn ordered_block_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedBlockIter<'_> {
|
||||
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
|
||||
ordered_blocks.sort_unstable();
|
||||
|
||||
@@ -73,6 +82,7 @@ impl PageVersions {
|
||||
page_versions: self,
|
||||
ordered_blocks,
|
||||
cur_block_idx: 0,
|
||||
cutoff_lsn,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -82,15 +92,29 @@ pub struct OrderedBlockIter<'a> {
|
||||
|
||||
ordered_blocks: Vec<u32>,
|
||||
cur_block_idx: usize,
|
||||
|
||||
cutoff_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for OrderedBlockIter<'a> {
|
||||
type Item = (u32, &'a VecMap<Lsn, PageVersion>);
|
||||
type Item = (u32, &'a [(Lsn, PageVersion)]);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let blknum: u32 = *self.ordered_blocks.get(self.cur_block_idx)?;
|
||||
self.cur_block_idx += 1;
|
||||
Some((blknum, self.page_versions.0.get(&blknum).unwrap()))
|
||||
while self.cur_block_idx < self.ordered_blocks.len() {
|
||||
let blknum = self.ordered_blocks[self.cur_block_idx];
|
||||
self.cur_block_idx += 1;
|
||||
|
||||
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
|
||||
let slice = self.page_versions.get_block_lsn_range(blknum, ..cutoff_lsn);
|
||||
if !slice.is_empty() {
|
||||
return Some((blknum, slice));
|
||||
}
|
||||
} else {
|
||||
return Some((blknum, self.page_versions.get_block_slice(blknum)));
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,10 +140,9 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
let mut iter = page_versions.ordered_block_iter();
|
||||
let mut iter = page_versions.ordered_block_iter(None);
|
||||
for blknum in 0..BLOCKS {
|
||||
let (actual_blknum, vec_map) = iter.next().unwrap();
|
||||
let slice = vec_map.as_slice();
|
||||
let (actual_blknum, slice) = iter.next().unwrap();
|
||||
assert_eq!(actual_blknum, blknum);
|
||||
assert_eq!(slice.len(), LSNS as usize);
|
||||
for lsn in 0..LSNS {
|
||||
@@ -128,5 +151,22 @@ mod tests {
|
||||
}
|
||||
assert!(iter.next().is_none());
|
||||
assert!(iter.next().is_none()); // should be robust against excessive next() calls
|
||||
|
||||
const CUTOFF_LSN: Lsn = Lsn(30);
|
||||
let mut iter = page_versions.ordered_block_iter(Some(CUTOFF_LSN));
|
||||
for blknum in 0..BLOCKS {
|
||||
let (actual_blknum, slice) = iter.next().unwrap();
|
||||
assert_eq!(actual_blknum, blknum);
|
||||
assert_eq!(slice.len(), CUTOFF_LSN.0 as usize);
|
||||
for lsn in 0..CUTOFF_LSN.0 {
|
||||
assert_eq!(Lsn(lsn), slice[lsn as usize].0);
|
||||
}
|
||||
}
|
||||
assert!(iter.next().is_none());
|
||||
assert!(iter.next().is_none()); // should be robust against excessive next() calls
|
||||
|
||||
let mut iter = page_versions.ordered_block_iter(Some(Lsn(0)));
|
||||
assert!(iter.next().is_none());
|
||||
assert!(iter.next().is_none()); // should be robust against excessive next() calls
|
||||
}
|
||||
}
|
||||
|
||||
@@ -794,17 +794,12 @@ def read_pid(path: Path):
|
||||
return int(path.read_text())
|
||||
|
||||
|
||||
@dataclass
|
||||
class WalAcceptorPort:
|
||||
pg: int
|
||||
http: int
|
||||
|
||||
@dataclass
|
||||
class WalAcceptor:
|
||||
""" An object representing a running wal acceptor daemon. """
|
||||
wa_bin_path: Path
|
||||
data_dir: Path
|
||||
port: WalAcceptorPort
|
||||
port: int
|
||||
num: int # identifier for logging
|
||||
pageserver_port: int
|
||||
auth_token: Optional[str] = None
|
||||
@@ -816,8 +811,7 @@ class WalAcceptor:
|
||||
|
||||
cmd = [str(self.wa_bin_path)]
|
||||
cmd.extend(["-D", str(self.data_dir)])
|
||||
cmd.extend(["--listen-pg", f"localhost:{self.port.pg}"])
|
||||
cmd.extend(["--listen-http", f"localhost:{self.port.http}"])
|
||||
cmd.extend(["-l", f"localhost:{self.port}"])
|
||||
cmd.append("--daemonize")
|
||||
cmd.append("--no-sync")
|
||||
# Tell page server it can receive WAL from this WAL safekeeper
|
||||
@@ -874,14 +868,14 @@ class WalAcceptor:
|
||||
|
||||
# "replication=0" hacks psycopg not to send additional queries
|
||||
# on startup, see https://github.com/psycopg/psycopg2/pull/482
|
||||
connstr = f"host=localhost port={self.port.pg} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
|
||||
connstr = f"host=localhost port={self.port} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
|
||||
|
||||
with closing(psycopg2.connect(connstr)) as conn:
|
||||
# server doesn't support transactions
|
||||
conn.autocommit = True
|
||||
with conn.cursor() as cur:
|
||||
request_json = json.dumps(request)
|
||||
print(f"JSON_CTRL request on port {self.port.pg}: {request_json}")
|
||||
print(f"JSON_CTRL request on port {self.port}: {request_json}")
|
||||
cur.execute("JSON_CTRL " + request_json)
|
||||
all = cur.fetchall()
|
||||
print(f"JSON_CTRL response: {all[0][0]}")
|
||||
@@ -904,10 +898,7 @@ class WalAcceptorFactory:
|
||||
wa = WalAcceptor(
|
||||
wa_bin_path=self.wa_bin_path,
|
||||
data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num),
|
||||
port=WalAcceptorPort(
|
||||
pg=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
),
|
||||
port=self.port_distributor.get_port(),
|
||||
num=wa_num,
|
||||
pageserver_port=self.pageserver_port,
|
||||
auth_token=auth_token,
|
||||
@@ -931,7 +922,7 @@ class WalAcceptorFactory:
|
||||
|
||||
def get_connstrs(self) -> str:
|
||||
""" Get list of wal acceptor endpoints suitable for wal_acceptors GUC """
|
||||
return ','.join(["localhost:{}".format(wa.port.pg) for wa in self.instances])
|
||||
return ','.join(["localhost:{}".format(wa.port) for wa in self.instances])
|
||||
|
||||
|
||||
@zenfixture
|
||||
|
||||
@@ -28,11 +28,9 @@ humantime = "2.1.0"
|
||||
walkdir = "2"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
hex = "0.4.3"
|
||||
const_format = "0.2.21"
|
||||
|
||||
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
|
||||
pageserver = { path = "../pageserver" }
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
workspace_hack = { path = "../workspace_hack" }
|
||||
zenith_metrics = { path = "../zenith_metrics" }
|
||||
zenith_utils = { path = "../zenith_utils" }
|
||||
|
||||
@@ -3,23 +3,18 @@
|
||||
//
|
||||
use anyhow::Result;
|
||||
use clap::{App, Arg};
|
||||
use const_format::formatcp;
|
||||
use daemonize::Daemonize;
|
||||
use log::*;
|
||||
use std::env;
|
||||
use std::net::TcpListener;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::thread;
|
||||
use zenith_utils::http::endpoint;
|
||||
use zenith_utils::logging;
|
||||
|
||||
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
|
||||
use walkeeper::s3_offload;
|
||||
use walkeeper::wal_service;
|
||||
use walkeeper::WalAcceptorConf;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
zenith_metrics::set_common_metrics_prefix("safekeeper");
|
||||
let arg_matches = App::new("Zenith wal_acceptor")
|
||||
.about("Store WAL stream to local file system and push it to WAL receivers")
|
||||
.arg(
|
||||
@@ -30,18 +25,11 @@ fn main() -> Result<()> {
|
||||
.help("Path to the WAL acceptor data directory"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("listen-pg")
|
||||
Arg::with_name("listen")
|
||||
.short("l")
|
||||
.long("listen-pg")
|
||||
.alias("listen") // for compatibility
|
||||
.long("listen")
|
||||
.takes_value(true)
|
||||
.help(formatcp!("listen for incoming WAL data connections on ip:port (default: {DEFAULT_PG_LISTEN_ADDR})")),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("listen-http")
|
||||
.long("listen-http")
|
||||
.takes_value(true)
|
||||
.help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")),
|
||||
.help("listen for incoming connections on ip:port (default: 127.0.0.1:5454)"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("pageserver")
|
||||
@@ -82,8 +70,7 @@ fn main() -> Result<()> {
|
||||
daemonize: false,
|
||||
no_sync: false,
|
||||
pageserver_addr: None,
|
||||
listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
listen_addr: "localhost:5454".to_string(),
|
||||
ttl: None,
|
||||
recall_period: None,
|
||||
pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(),
|
||||
@@ -104,12 +91,8 @@ fn main() -> Result<()> {
|
||||
conf.daemonize = true;
|
||||
}
|
||||
|
||||
if let Some(addr) = arg_matches.value_of("listen-pg") {
|
||||
conf.listen_pg_addr = addr.to_owned();
|
||||
}
|
||||
|
||||
if let Some(addr) = arg_matches.value_of("listen-http") {
|
||||
conf.listen_http_addr = addr.to_owned();
|
||||
if let Some(addr) = arg_matches.value_of("listen") {
|
||||
conf.listen_addr = addr.to_owned();
|
||||
}
|
||||
|
||||
if let Some(addr) = arg_matches.value_of("pageserver") {
|
||||
@@ -131,11 +114,6 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
|
||||
let log_filename = conf.data_dir.join("wal_acceptor.log");
|
||||
let (_scope_guard, log_file) = logging::init(log_filename, conf.daemonize)?;
|
||||
|
||||
let http_listener = TcpListener::bind(conf.listen_http_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
|
||||
e
|
||||
})?;
|
||||
|
||||
if conf.daemonize {
|
||||
info!("daemonizing...");
|
||||
|
||||
@@ -158,16 +136,6 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
|
||||
|
||||
let mut threads = Vec::new();
|
||||
|
||||
let http_endpoint_thread = thread::Builder::new()
|
||||
.name("http_endpoint_thread".into())
|
||||
.spawn(move || {
|
||||
// No authentication at all: read-only metrics only, early stage.
|
||||
let router = endpoint::make_router();
|
||||
endpoint::serve_thread_main(router, http_listener).unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
threads.push(http_endpoint_thread);
|
||||
|
||||
if conf.ttl.is_some() {
|
||||
let s3_conf = conf.clone();
|
||||
let s3_offload_thread = thread::Builder::new()
|
||||
|
||||
@@ -11,23 +11,12 @@ pub mod send_wal;
|
||||
pub mod timeline;
|
||||
pub mod wal_service;
|
||||
|
||||
pub mod defaults {
|
||||
use const_format::formatcp;
|
||||
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WalAcceptorConf {
|
||||
pub data_dir: PathBuf,
|
||||
pub daemonize: bool,
|
||||
pub no_sync: bool,
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_addr: String,
|
||||
pub pageserver_addr: Option<String>,
|
||||
// TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework
|
||||
pub pageserver_auth_token: Option<String>,
|
||||
|
||||
@@ -42,7 +42,7 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZT
|
||||
);
|
||||
|
||||
// use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses
|
||||
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr);
|
||||
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_addr);
|
||||
let me_conf: Config = me_connstr.parse().unwrap();
|
||||
let (host, port) = connection_host_port(&me_conf);
|
||||
let callme = format!(
|
||||
|
||||
@@ -15,11 +15,8 @@ use std::cmp::min;
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
use crate::replication::HotStandbyFeedback;
|
||||
use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
|
||||
use zenith_metrics::{register_gauge_vec, Gauge, GaugeVec};
|
||||
use zenith_utils::bin_ser::LeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::pq_proto::SystemId;
|
||||
@@ -282,38 +279,6 @@ pub trait Storage {
|
||||
fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>;
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
// The prometheus crate does not support u64 yet, i64 only (see `IntGauge`).
|
||||
// i64 is faster than f64, so update to u64 when available.
|
||||
static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!(
|
||||
"safekeeper_flush_lsn",
|
||||
"Current flush_lsn, grouped by timeline",
|
||||
&["ztli"]
|
||||
)
|
||||
.expect("Failed to register safekeeper_flush_lsn int gauge vec");
|
||||
static ref COMMIT_LSN_GAUGE: GaugeVec = register_gauge_vec!(
|
||||
"safekeeper_commit_lsn",
|
||||
"Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
|
||||
&["ztli"]
|
||||
)
|
||||
.expect("Failed to register safekeeper_commit_lsn int gauge vec");
|
||||
}
|
||||
|
||||
struct SafeKeeperMetrics {
|
||||
flush_lsn: Gauge,
|
||||
commit_lsn: Gauge,
|
||||
}
|
||||
|
||||
impl SafeKeeperMetrics {
|
||||
fn new(ztli: ZTimelineId) -> SafeKeeperMetrics {
|
||||
let ztli_str = format!("{}", ztli);
|
||||
SafeKeeperMetrics {
|
||||
flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&ztli_str]),
|
||||
commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&ztli_str]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// SafeKeeper which consumes events (messages from compute) and provides
|
||||
/// replies.
|
||||
pub struct SafeKeeper<ST: Storage> {
|
||||
@@ -321,8 +286,6 @@ pub struct SafeKeeper<ST: Storage> {
|
||||
/// Established by reading wal.
|
||||
pub flush_lsn: Lsn,
|
||||
pub tli: u32,
|
||||
// Cached metrics so we don't have to recompute labels on each update.
|
||||
metrics: Option<SafeKeeperMetrics>,
|
||||
/// not-yet-flushed pairs of same named fields in s.*
|
||||
pub commit_lsn: Lsn,
|
||||
pub truncate_lsn: Lsn,
|
||||
@@ -341,7 +304,6 @@ where
|
||||
SafeKeeper {
|
||||
flush_lsn,
|
||||
tli,
|
||||
metrics: None,
|
||||
commit_lsn: state.commit_lsn,
|
||||
truncate_lsn: state.truncate_lsn,
|
||||
storage,
|
||||
@@ -393,8 +355,6 @@ where
|
||||
self.s.server.wal_seg_size = msg.wal_seg_size;
|
||||
self.storage.persist(&self.s, true)?;
|
||||
|
||||
self.metrics = Some(SafeKeeperMetrics::new(self.s.server.ztli));
|
||||
|
||||
info!(
|
||||
"processed greeting from proposer {:?}, sending term {:?}",
|
||||
msg.proposer_id, self.s.acceptor_state.term
|
||||
@@ -518,11 +478,6 @@ where
|
||||
}
|
||||
if last_rec_lsn > self.flush_lsn {
|
||||
self.flush_lsn = last_rec_lsn;
|
||||
self.metrics
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.flush_lsn
|
||||
.set(u64::from(self.flush_lsn) as f64);
|
||||
}
|
||||
|
||||
// Advance commit_lsn taking into account what we have locally. xxx this
|
||||
@@ -540,11 +495,6 @@ where
|
||||
sync_control_file |=
|
||||
commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn;
|
||||
self.commit_lsn = commit_lsn;
|
||||
self.metrics
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.commit_lsn
|
||||
.set(u64::from(self.commit_lsn) as f64);
|
||||
}
|
||||
|
||||
self.truncate_lsn = msg.h.truncate_lsn;
|
||||
|
||||
@@ -13,9 +13,9 @@ use zenith_utils::postgres_backend::{AuthType, PostgresBackend};
|
||||
|
||||
/// Accept incoming TCP connections and spawn them into a background thread.
|
||||
pub fn thread_main(conf: WalAcceptorConf) -> Result<()> {
|
||||
info!("Starting wal acceptor on {}", conf.listen_pg_addr);
|
||||
let listener = TcpListener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
|
||||
info!("Starting wal acceptor on {}", conf.listen_addr);
|
||||
let listener = TcpListener::bind(conf.listen_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_addr, e);
|
||||
e
|
||||
})?;
|
||||
|
||||
|
||||
@@ -5,8 +5,6 @@
|
||||
use lazy_static::lazy_static;
|
||||
use once_cell::race::OnceBox;
|
||||
pub use prometheus::{exponential_buckets, linear_buckets};
|
||||
pub use prometheus::{register_gauge, Gauge};
|
||||
pub use prometheus::{register_gauge_vec, GaugeVec};
|
||||
pub use prometheus::{register_histogram, Histogram};
|
||||
pub use prometheus::{register_histogram_vec, HistogramVec};
|
||||
pub use prometheus::{register_int_counter, IntCounter};
|
||||
|
||||
@@ -124,6 +124,10 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn reserve(&mut self, additional: usize) {
|
||||
self.0.reserve(additional);
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_key<K, V>(entry: &(K, V)) -> &K {
|
||||
|
||||
Reference in New Issue
Block a user