Compare commits

..

4 Commits

Author SHA1 Message Date
Patrick Insinger
c4fb509aa2 pageserver - reserve while building delta metadata 2021-10-08 12:04:01 -07:00
Patrick Insinger
48a6bbe4c1 pageserver - use VecMap for delta metadata & sizes 2021-10-08 11:28:00 -07:00
Patrick Insinger
9a687eae28 pageserver - use VecMap for in memory segsizes 2021-10-08 10:44:54 -07:00
Patrick Insinger
0579c8bbb9 pageserver - use VecMap for in-memory PageVersions 2021-10-08 01:00:20 -07:00
13 changed files with 81 additions and 149 deletions

2
Cargo.lock generated
View File

@@ -2339,7 +2339,6 @@ dependencies = [
"byteorder",
"bytes",
"clap",
"const_format",
"crc32c",
"daemonize",
"fs2",
@@ -2359,7 +2358,6 @@ dependencies = [
"tokio-stream",
"walkdir",
"workspace_hack",
"zenith_metrics",
"zenith_utils",
]

View File

@@ -390,7 +390,7 @@ impl DeltaLayer {
assert!(!relsizes.is_empty());
}
let delta_layer = DeltaLayer {
let mut delta_layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
tenantid,
@@ -404,13 +404,14 @@ impl DeltaLayer {
relsizes,
}),
};
let mut inner = delta_layer.inner.lock().unwrap();
// Write the in-memory btreemaps into a file
let path = delta_layer
.path()
.expect("DeltaLayer is supposed to have a layer path on disk");
let inner = delta_layer.inner.get_mut().unwrap();
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let file = File::create(&path)?;
@@ -420,11 +421,9 @@ impl DeltaLayer {
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
for (blknum, history) in page_versions {
for (lsn, page_version) in history.as_slice() {
if lsn >= &end_lsn {
continue;
}
inner.page_version_metas.reserve(history.len());
for (lsn, page_version) in history {
let buf = PageVersion::ser(page_version)?;
let blob_range = page_version_writer.write_blob(&buf)?;
@@ -469,8 +468,6 @@ impl DeltaLayer {
trace!("saved {}", &path.display());
drop(inner);
Ok(delta_layer)
}

View File

@@ -280,8 +280,8 @@ impl Layer for InMemoryLayer {
println!("segsizes {}: {}", k, v);
}
for (blknum, history) in inner.page_versions.ordered_block_iter() {
for (lsn, pv) in history.as_slice() {
for (blknum, history) in inner.page_versions.ordered_block_iter(None) {
for (lsn, pv) in history {
println!(
"blk {} at {}: {}/{}\n",
blknum,
@@ -700,7 +700,7 @@ impl InMemoryLayer {
self.start_lsn,
drop_lsn,
true,
inner.page_versions.ordered_block_iter(),
inner.page_versions.ordered_block_iter(None),
inner.segsizes.clone(),
)?;
trace!(
@@ -714,6 +714,8 @@ impl InMemoryLayer {
let end_lsn = self.end_lsn.unwrap();
let mut before_page_versions = inner.page_versions.ordered_block_iter(Some(end_lsn));
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
if self.start_lsn != end_lsn {
@@ -728,7 +730,7 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn,
false,
inner.page_versions.ordered_block_iter(),
before_page_versions,
before_segsizes,
)?;
frozen_layers.push(Arc::new(delta_layer));
@@ -739,10 +741,7 @@ impl InMemoryLayer {
end_lsn
);
} else {
for (_blknum, history) in inner.page_versions.ordered_block_iter() {
let (lsn, _pv) = history.as_slice().first().unwrap();
assert!(lsn >= &end_lsn);
}
assert!(before_page_versions.next().is_none());
}
drop(inner);

View File

@@ -24,6 +24,14 @@ impl PageVersions {
map.append_or_update_last(lsn, page_version).unwrap()
}
/// Get all [`PageVersion`]s in a block
pub fn get_block_slice(&self, blknum: u32) -> &[(Lsn, PageVersion)] {
self.0
.get(&blknum)
.map(VecMap::as_slice)
.unwrap_or(EMPTY_SLICE)
}
/// Get a range of [`PageVersions`] in a block
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(
&self,
@@ -65,7 +73,8 @@ impl PageVersions {
}
/// Iterate through block-history pairs in block order.
pub fn ordered_block_iter(&self) -> OrderedBlockIter<'_> {
/// If a [`cutoff_lsn`] is set, only include history with `lsn < cutoff_lsn`
pub fn ordered_block_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedBlockIter<'_> {
let mut ordered_blocks: Vec<u32> = self.0.keys().cloned().collect();
ordered_blocks.sort_unstable();
@@ -73,6 +82,7 @@ impl PageVersions {
page_versions: self,
ordered_blocks,
cur_block_idx: 0,
cutoff_lsn,
}
}
}
@@ -82,15 +92,29 @@ pub struct OrderedBlockIter<'a> {
ordered_blocks: Vec<u32>,
cur_block_idx: usize,
cutoff_lsn: Option<Lsn>,
}
impl<'a> Iterator for OrderedBlockIter<'a> {
type Item = (u32, &'a VecMap<Lsn, PageVersion>);
type Item = (u32, &'a [(Lsn, PageVersion)]);
fn next(&mut self) -> Option<Self::Item> {
let blknum: u32 = *self.ordered_blocks.get(self.cur_block_idx)?;
self.cur_block_idx += 1;
Some((blknum, self.page_versions.0.get(&blknum).unwrap()))
while self.cur_block_idx < self.ordered_blocks.len() {
let blknum = self.ordered_blocks[self.cur_block_idx];
self.cur_block_idx += 1;
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
let slice = self.page_versions.get_block_lsn_range(blknum, ..cutoff_lsn);
if !slice.is_empty() {
return Some((blknum, slice));
}
} else {
return Some((blknum, self.page_versions.get_block_slice(blknum)));
}
}
None
}
}
@@ -116,10 +140,9 @@ mod tests {
}
}
let mut iter = page_versions.ordered_block_iter();
let mut iter = page_versions.ordered_block_iter(None);
for blknum in 0..BLOCKS {
let (actual_blknum, vec_map) = iter.next().unwrap();
let slice = vec_map.as_slice();
let (actual_blknum, slice) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(slice.len(), LSNS as usize);
for lsn in 0..LSNS {
@@ -128,5 +151,22 @@ mod tests {
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
const CUTOFF_LSN: Lsn = Lsn(30);
let mut iter = page_versions.ordered_block_iter(Some(CUTOFF_LSN));
for blknum in 0..BLOCKS {
let (actual_blknum, slice) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(slice.len(), CUTOFF_LSN.0 as usize);
for lsn in 0..CUTOFF_LSN.0 {
assert_eq!(Lsn(lsn), slice[lsn as usize].0);
}
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
let mut iter = page_versions.ordered_block_iter(Some(Lsn(0)));
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
}
}

View File

@@ -794,17 +794,12 @@ def read_pid(path: Path):
return int(path.read_text())
@dataclass
class WalAcceptorPort:
pg: int
http: int
@dataclass
class WalAcceptor:
""" An object representing a running wal acceptor daemon. """
wa_bin_path: Path
data_dir: Path
port: WalAcceptorPort
port: int
num: int # identifier for logging
pageserver_port: int
auth_token: Optional[str] = None
@@ -816,8 +811,7 @@ class WalAcceptor:
cmd = [str(self.wa_bin_path)]
cmd.extend(["-D", str(self.data_dir)])
cmd.extend(["--listen-pg", f"localhost:{self.port.pg}"])
cmd.extend(["--listen-http", f"localhost:{self.port.http}"])
cmd.extend(["-l", f"localhost:{self.port}"])
cmd.append("--daemonize")
cmd.append("--no-sync")
# Tell page server it can receive WAL from this WAL safekeeper
@@ -874,14 +868,14 @@ class WalAcceptor:
# "replication=0" hacks psycopg not to send additional queries
# on startup, see https://github.com/psycopg/psycopg2/pull/482
connstr = f"host=localhost port={self.port.pg} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
connstr = f"host=localhost port={self.port} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
with closing(psycopg2.connect(connstr)) as conn:
# server doesn't support transactions
conn.autocommit = True
with conn.cursor() as cur:
request_json = json.dumps(request)
print(f"JSON_CTRL request on port {self.port.pg}: {request_json}")
print(f"JSON_CTRL request on port {self.port}: {request_json}")
cur.execute("JSON_CTRL " + request_json)
all = cur.fetchall()
print(f"JSON_CTRL response: {all[0][0]}")
@@ -904,10 +898,7 @@ class WalAcceptorFactory:
wa = WalAcceptor(
wa_bin_path=self.wa_bin_path,
data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num),
port=WalAcceptorPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
),
port=self.port_distributor.get_port(),
num=wa_num,
pageserver_port=self.pageserver_port,
auth_token=auth_token,
@@ -931,7 +922,7 @@ class WalAcceptorFactory:
def get_connstrs(self) -> str:
""" Get list of wal acceptor endpoints suitable for wal_acceptors GUC """
return ','.join(["localhost:{}".format(wa.port.pg) for wa in self.instances])
return ','.join(["localhost:{}".format(wa.port) for wa in self.instances])
@zenfixture

View File

@@ -28,11 +28,9 @@ humantime = "2.1.0"
walkdir = "2"
serde = { version = "1.0", features = ["derive"] }
hex = "0.4.3"
const_format = "0.2.21"
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
pageserver = { path = "../pageserver" }
postgres_ffi = { path = "../postgres_ffi" }
workspace_hack = { path = "../workspace_hack" }
zenith_metrics = { path = "../zenith_metrics" }
zenith_utils = { path = "../zenith_utils" }

View File

@@ -3,23 +3,18 @@
//
use anyhow::Result;
use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use log::*;
use std::env;
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::thread;
use zenith_utils::http::endpoint;
use zenith_utils::logging;
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::WalAcceptorConf;
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper");
let arg_matches = App::new("Zenith wal_acceptor")
.about("Store WAL stream to local file system and push it to WAL receivers")
.arg(
@@ -30,18 +25,11 @@ fn main() -> Result<()> {
.help("Path to the WAL acceptor data directory"),
)
.arg(
Arg::with_name("listen-pg")
Arg::with_name("listen")
.short("l")
.long("listen-pg")
.alias("listen") // for compatibility
.long("listen")
.takes_value(true)
.help(formatcp!("listen for incoming WAL data connections on ip:port (default: {DEFAULT_PG_LISTEN_ADDR})")),
)
.arg(
Arg::with_name("listen-http")
.long("listen-http")
.takes_value(true)
.help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")),
.help("listen for incoming connections on ip:port (default: 127.0.0.1:5454)"),
)
.arg(
Arg::with_name("pageserver")
@@ -82,8 +70,7 @@ fn main() -> Result<()> {
daemonize: false,
no_sync: false,
pageserver_addr: None,
listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(),
listen_addr: "localhost:5454".to_string(),
ttl: None,
recall_period: None,
pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(),
@@ -104,12 +91,8 @@ fn main() -> Result<()> {
conf.daemonize = true;
}
if let Some(addr) = arg_matches.value_of("listen-pg") {
conf.listen_pg_addr = addr.to_owned();
}
if let Some(addr) = arg_matches.value_of("listen-http") {
conf.listen_http_addr = addr.to_owned();
if let Some(addr) = arg_matches.value_of("listen") {
conf.listen_addr = addr.to_owned();
}
if let Some(addr) = arg_matches.value_of("pageserver") {
@@ -131,11 +114,6 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
let log_filename = conf.data_dir.join("wal_acceptor.log");
let (_scope_guard, log_file) = logging::init(log_filename, conf.daemonize)?;
let http_listener = TcpListener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
e
})?;
if conf.daemonize {
info!("daemonizing...");
@@ -158,16 +136,6 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
let mut threads = Vec::new();
let http_endpoint_thread = thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(move || {
// No authentication at all: read-only metrics only, early stage.
let router = endpoint::make_router();
endpoint::serve_thread_main(router, http_listener).unwrap();
})
.unwrap();
threads.push(http_endpoint_thread);
if conf.ttl.is_some() {
let s3_conf = conf.clone();
let s3_offload_thread = thread::Builder::new()

View File

@@ -11,23 +11,12 @@ pub mod send_wal;
pub mod timeline;
pub mod wal_service;
pub mod defaults {
use const_format::formatcp;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
}
#[derive(Debug, Clone)]
pub struct WalAcceptorConf {
pub data_dir: PathBuf,
pub daemonize: bool,
pub no_sync: bool,
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub listen_addr: String,
pub pageserver_addr: Option<String>,
// TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework
pub pageserver_auth_token: Option<String>,

View File

@@ -42,7 +42,7 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZT
);
// use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr);
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_addr);
let me_conf: Config = me_connstr.parse().unwrap();
let (host, port) = connection_host_port(&me_conf);
let callme = format!(

View File

@@ -15,11 +15,8 @@ use std::cmp::min;
use std::io;
use std::io::Read;
use lazy_static::lazy_static;
use crate::replication::HotStandbyFeedback;
use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
use zenith_metrics::{register_gauge_vec, Gauge, GaugeVec};
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::pq_proto::SystemId;
@@ -282,38 +279,6 @@ pub trait Storage {
fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>;
}
lazy_static! {
// The prometheus crate does not support u64 yet, i64 only (see `IntGauge`).
// i64 is faster than f64, so update to u64 when available.
static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!(
"safekeeper_flush_lsn",
"Current flush_lsn, grouped by timeline",
&["ztli"]
)
.expect("Failed to register safekeeper_flush_lsn int gauge vec");
static ref COMMIT_LSN_GAUGE: GaugeVec = register_gauge_vec!(
"safekeeper_commit_lsn",
"Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
&["ztli"]
)
.expect("Failed to register safekeeper_commit_lsn int gauge vec");
}
struct SafeKeeperMetrics {
flush_lsn: Gauge,
commit_lsn: Gauge,
}
impl SafeKeeperMetrics {
fn new(ztli: ZTimelineId) -> SafeKeeperMetrics {
let ztli_str = format!("{}", ztli);
SafeKeeperMetrics {
flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&ztli_str]),
commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&ztli_str]),
}
}
}
/// SafeKeeper which consumes events (messages from compute) and provides
/// replies.
pub struct SafeKeeper<ST: Storage> {
@@ -321,8 +286,6 @@ pub struct SafeKeeper<ST: Storage> {
/// Established by reading wal.
pub flush_lsn: Lsn,
pub tli: u32,
// Cached metrics so we don't have to recompute labels on each update.
metrics: Option<SafeKeeperMetrics>,
/// not-yet-flushed pairs of same named fields in s.*
pub commit_lsn: Lsn,
pub truncate_lsn: Lsn,
@@ -341,7 +304,6 @@ where
SafeKeeper {
flush_lsn,
tli,
metrics: None,
commit_lsn: state.commit_lsn,
truncate_lsn: state.truncate_lsn,
storage,
@@ -393,8 +355,6 @@ where
self.s.server.wal_seg_size = msg.wal_seg_size;
self.storage.persist(&self.s, true)?;
self.metrics = Some(SafeKeeperMetrics::new(self.s.server.ztli));
info!(
"processed greeting from proposer {:?}, sending term {:?}",
msg.proposer_id, self.s.acceptor_state.term
@@ -518,11 +478,6 @@ where
}
if last_rec_lsn > self.flush_lsn {
self.flush_lsn = last_rec_lsn;
self.metrics
.as_ref()
.unwrap()
.flush_lsn
.set(u64::from(self.flush_lsn) as f64);
}
// Advance commit_lsn taking into account what we have locally. xxx this
@@ -540,11 +495,6 @@ where
sync_control_file |=
commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn;
self.commit_lsn = commit_lsn;
self.metrics
.as_ref()
.unwrap()
.commit_lsn
.set(u64::from(self.commit_lsn) as f64);
}
self.truncate_lsn = msg.h.truncate_lsn;

View File

@@ -13,9 +13,9 @@ use zenith_utils::postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
pub fn thread_main(conf: WalAcceptorConf) -> Result<()> {
info!("Starting wal acceptor on {}", conf.listen_pg_addr);
let listener = TcpListener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
info!("Starting wal acceptor on {}", conf.listen_addr);
let listener = TcpListener::bind(conf.listen_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_addr, e);
e
})?;

View File

@@ -5,8 +5,6 @@
use lazy_static::lazy_static;
use once_cell::race::OnceBox;
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_gauge, Gauge};
pub use prometheus::{register_gauge_vec, GaugeVec};
pub use prometheus::{register_histogram, Histogram};
pub use prometheus::{register_histogram_vec, HistogramVec};
pub use prometheus::{register_int_counter, IntCounter};

View File

@@ -124,6 +124,10 @@ impl<K: Ord, V> VecMap<K, V> {
Ok(())
}
pub fn reserve(&mut self, additional: usize) {
self.0.reserve(additional);
}
}
fn extract_key<K, V>(entry: &(K, V)) -> &K {