Use COW version of YAKV

This commit is contained in:
Konstantin Knizhnik
2021-11-22 19:22:39 +03:00
parent 8fda7a6183
commit f73d043a8b
7 changed files with 52 additions and 50 deletions

3
Cargo.lock generated
View File

@@ -2571,10 +2571,9 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.2.0"
version = "0.2.1"
dependencies = [
"anyhow",
"crc32c",
"fs2",
]

View File

@@ -294,7 +294,7 @@ impl PostgresNode {
conf.append("max_wal_senders", "10");
// wal_log_hints is mandatory when running against pageserver (see gh issue#192)
// TODO: is it possible to check wal_log_hints at pageserver side via XLOG_PARAMETER_CHANGE?
// conf.append("wal_log_hints", "on");
conf.append("wal_log_hints", "on");
conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB");

View File

@@ -37,8 +37,8 @@ async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
yakv = { path = "../../yakv" }
#yakv = "0.2.0"
#yakv = { path = "../../yakv" }
yakv = "0.2.1"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -42,6 +42,7 @@ struct CfgFileParams {
listen_http_addr: Option<String>,
checkpoint_distance: Option<String>,
checkpoint_period: Option<String>,
reconstruct_threshold: Option<String>,
gc_horizon: Option<String>,
gc_period: Option<String>,
pg_distrib_dir: Option<String>,
@@ -103,6 +104,7 @@ impl CfgFileParams {
listen_http_addr: get_arg("listen-http"),
checkpoint_distance: get_arg("checkpoint_distance"),
checkpoint_period: get_arg("checkpoint_period"),
reconstruct_threshold: get_arg("reconstruct_threshold"),
gc_horizon: get_arg("gc_horizon"),
gc_period: get_arg("gc_period"),
pg_distrib_dir: get_arg("postgres-distrib"),
@@ -121,6 +123,7 @@ impl CfgFileParams {
listen_http_addr: self.listen_http_addr.or(other.listen_http_addr),
checkpoint_distance: self.checkpoint_distance.or(other.checkpoint_distance),
checkpoint_period: self.checkpoint_period.or(other.checkpoint_period),
reconstruct_threshold: self.reconstruct_threshold.or(other.reconstruct_threshold),
gc_horizon: self.gc_horizon.or(other.gc_horizon),
gc_period: self.gc_period.or(other.gc_period),
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
@@ -158,6 +161,11 @@ impl CfgFileParams {
None => DEFAULT_CHECKPOINT_PERIOD,
};
let reconstruct_threshold: u64 = match self.reconstruct_threshold.as_ref() {
Some(reconstruct_threshold_str) => reconstruct_threshold_str.parse()?,
None => DEFAULT_RECONSTRUCT_THRESHOLD,
};
let gc_horizon: u64 = match self.gc_horizon.as_ref() {
Some(horizon_str) => horizon_str.parse()?,
None => DEFAULT_GC_HORIZON,
@@ -236,6 +244,7 @@ impl CfgFileParams {
listen_http_addr,
checkpoint_distance,
checkpoint_period,
reconstruct_threshold,
gc_horizon,
gc_period,
@@ -296,6 +305,12 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Interval between checkpoint iterations"),
)
.arg(
Arg::with_name("reconstruct_threshold")
.long("reconstruct_threshold")
.takes_value(true)
.help("Minimal size of deltas after which page reconstruction (materialization) can be performed"),
)
.arg(
Arg::with_name("gc_horizon")
.long("gc_horizon")
@@ -600,6 +615,7 @@ mod tests {
listen_http_addr: Some("listen_http_addr_VALUE".to_string()),
checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()),
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()),
gc_horizon: Some("gc_horizon_VALUE".to_string()),
gc_period: Some("gc_period_VALUE".to_string()),
pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()),
@@ -623,6 +639,7 @@ mod tests {
listen_http_addr = 'listen_http_addr_VALUE'
checkpoint_distance = 'checkpoint_distance_VALUE'
checkpoint_period = 'checkpoint_period_VALUE'
reconstruct_threshold = 'reconstruct_threshold_VALUE'
gc_horizon = 'gc_horizon_VALUE'
gc_period = 'gc_period_VALUE'
pg_distrib_dir = 'pg_distrib_dir_VALUE'
@@ -657,6 +674,7 @@ local_path = 'relish_storage_local_VALUE'
listen_http_addr: Some("listen_http_addr_VALUE".to_string()),
checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()),
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()),
gc_horizon: Some("gc_horizon_VALUE".to_string()),
gc_period: Some("gc_period_VALUE".to_string()),
pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()),
@@ -683,6 +701,7 @@ local_path = 'relish_storage_local_VALUE'
listen_http_addr = 'listen_http_addr_VALUE'
checkpoint_distance = 'checkpoint_distance_VALUE'
checkpoint_period = 'checkpoint_period_VALUE'
reconstruct_threshold = 'reconstruct_threshold_VALUE'
gc_horizon = 'gc_horizon_VALUE'
gc_period = 'gc_period_VALUE'
pg_distrib_dir = 'pg_distrib_dir_VALUE'

View File

@@ -400,8 +400,6 @@ impl BufferedRepository {
std::thread::sleep(conf.checkpoint_period);
info!("checkpointer thread for tenant {} waking up", self.tenantid);
// checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE
// bytes of WAL since last checkpoint.
{
let timelines: Vec<(ZTimelineId, Arc<BufferedTimeline>)> = self
.timelines
@@ -419,7 +417,7 @@ impl BufferedRepository {
STORAGE_TIME
.with_label_values(&["checkpoint_timed"])
.observe_closure_duration(|| {
timeline.checkpoint_internal(conf.checkpoint_distance, false)
timeline.checkpoint_internal(conf.reconstruct_threshold, false)
})?
}
// release lock on 'timelines'
@@ -933,7 +931,7 @@ impl Timeline for BufferedTimeline {
fn checkpoint(&self) -> Result<()> {
STORAGE_TIME
.with_label_values(&["checkpoint_force"])
//pass checkpoint_distance=0 to force checkpoint
//pass resonstruct_threshold=0 to force page materialization
.observe_closure_duration(|| self.checkpoint_internal(0, true))
}
@@ -1375,12 +1373,12 @@ impl BufferedTimeline {
/// Matrialize last page versions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL.
/// checkpoint_interval is used to measure total length of applied WAL records.
/// reconstruct_threshold is used to measure total length of applied WAL records.
/// It can be used to prevent to frequent materialization of page. We can avoid store materialized page if history of changes is not so long
/// and can be fast replayed. Alternatively we can measure interval from last version LSN:
/// it will enforce materialization of "stabilized" pages. But there is a risk that permanently updated page will never be materialized.
///
fn checkpoint_internal(&self, checkpoint_distance: u64, _forced: bool) -> Result<()> {
fn checkpoint_internal(&self, reconstruct_threshold: u64, _forced: bool) -> Result<()> {
// From boundary is constant and till boundary is changed at each iteration.
let from = StoreKey::Data(DataKey {
rel: RelishTag::Relation(ZERO_TAG),
@@ -1485,7 +1483,7 @@ impl BufferedTimeline {
drop(iter);
drop(store);
// See comment above. May be we should also enforce here checkpointing of too old versions.
if history_len as u64 >= checkpoint_distance {
if history_len as u64 >= reconstruct_threshold {
let img = RECONSTRUCT_TIME.observe_closure_duration(|| {
self.reconstruct_page(dk.rel, dk.blknum, dk.lsn, data)
});
@@ -1898,8 +1896,9 @@ impl<'a> BufferedTimelineWriter<'a> {
}
*/
}
if store.data.committed {
self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk
if store.data.commit_lsn + self.tl.conf.checkpoint_distance < lsn {
store.data.commit(lsn)?;
self.tl.disk_consistent_lsn.store(lsn);
}
Ok(())
}
@@ -1981,9 +1980,6 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
size: Some(relsize),
};
store.data.put(&mk.ser()?, &mv.ser()?)?;
if store.data.committed {
self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk
}
Ok(())
}
@@ -1996,10 +1992,6 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
let mk = StoreKey::Metadata(MetadataKey { rel, lsn });
let mv = MetadataValue { size: None }; // None indicates dropped relation
store.data.put(&mk.ser()?, &mv.ser()?)?;
if store.data.committed {
self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk
}
Ok(())
}
@@ -2007,11 +1999,12 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> {
/// Complete all delayed commits and advance disk_consistent_lsn
///
fn checkpoint(&self) -> Result<()> {
let store = self.tl.store.write().unwrap();
store.data.checkpoint()?;
let mut store = self.tl.store.write().unwrap();
let lsn = self.tl.get_last_record_lsn();
store.data.commit(lsn)?;
self.tl
.disk_consistent_lsn
.store(self.tl.get_last_record_lsn());
.store(lsn);
Ok(())
}

View File

@@ -33,11 +33,13 @@ pub mod defaults {
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
// Minimal size of WAL records chain to trigger materialization of the page
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0;
pub const DEFAULT_GC_HORIZON: u64 = 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
@@ -64,6 +66,7 @@ pub struct PageServerConf {
// page server crashes.
pub checkpoint_distance: u64,
pub checkpoint_period: Duration,
pub reconstruct_threshold: u64,
pub gc_horizon: u64,
pub gc_period: Duration,
@@ -149,6 +152,7 @@ impl PageServerConf {
daemonize: false,
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_period: Duration::from_secs(10),
reconstruct_threshold: defaults::DEFAULT_RECONSTRUCT_THRESHOLD,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),

View File

@@ -4,13 +4,13 @@ use std::convert::TryInto;
use std::ops::{Bound, RangeBounds};
use std::path::Path;
use tracing::*;
use zenith_utils::lsn::Lsn;
use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Value};
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024;
const CACHE_SIZE: usize = 32 * 1024; // 256Mb
const COMMIT_THRESHOLD: usize = CACHE_SIZE / 2;
const WAL_FLUSH_THRESHOLD: u32 = 128; // 1Mb
//const CACHE_SIZE: usize = 128 * 1024; // 1Gb
///
/// Toast storage consistof two KV databases: one for storing main index
@@ -20,7 +20,7 @@ const WAL_FLUSH_THRESHOLD: u32 = 128; // 1Mb
///
pub struct ToastStore {
db: Storage, // key-value database
pub committed: bool, // last transaction was committed (not delayed)
pub commit_lsn: Lsn, // LSN of last committed transaction
}
pub struct ToastIterator<'a> {
@@ -126,14 +126,12 @@ impl ToastStore {
Ok(ToastStore {
db: Storage::open(
&path.join("pageserver.db"),
Some(&path.join("pageserver.log")),
StorageConfig {
cache_size: CACHE_SIZE,
checkpoint_interval: CHECKPOINT_INTERVAL,
wal_flush_threshold: WAL_FLUSH_THRESHOLD,
nosync: false,
},
)?,
committed: false,
commit_lsn: Lsn(0),
})
}
@@ -141,7 +139,6 @@ impl ToastStore {
let mut tx = self.db.start_transaction();
let value_len = value.len();
let mut key = key.clone();
self.committed = false;
if value_len >= TOAST_SEGMENT_SIZE {
let compressed_data = lz4_flex::compress_prepend_size(value);
let compressed_data_len = compressed_data.len();
@@ -170,18 +167,14 @@ impl ToastStore {
key.extend_from_slice(&[0u8; 4]);
tx.put(&key, value)?;
}
if tx.get_cache_info().pinned > COMMIT_THRESHOLD {
tx.commit()?;
self.committed = true;
} else {
tx.delay()?;
}
tx.delay()?;
Ok(())
}
pub fn checkpoint(&self) -> Result<()> {
pub fn commit(&mut self, commit_lsn: Lsn) -> Result<()> {
let mut tx = self.db.start_transaction();
tx.commit()?;
self.commit_lsn = commit_lsn;
Ok(())
}
@@ -228,7 +221,6 @@ impl ToastStore {
min_key.extend_from_slice(&[0u8; 4]);
max_key.extend_from_slice(&[0xFFu8; 4]);
let mut iter = tx.range(&min_key..&max_key);
self.committed = false;
if let Some(entry) = iter.next() {
let mut key = entry?.0.clone();
let key_len = key.len();
@@ -243,12 +235,7 @@ impl ToastStore {
tx.remove(&key)?;
}
}
if tx.get_cache_info().pinned > COMMIT_THRESHOLD {
tx.commit()?;
self.committed = true;
} else {
tx.delay()?;
}
tx.delay()?;
Ok(())
}
}