Compare commits

...

28 Commits

Author SHA1 Message Date
Konstantin Knizhnik
2e54dcdffe Fix deadlock in buffered repo 2021-12-28 19:20:28 +03:00
Konstantin Knizhnik
1ca9bea7c0 Reduce toast segment size to 2000 2021-12-15 18:19:45 +03:00
Konstantin Knizhnik
945e83a6ba Add nonblock.rs 2021-12-15 00:05:26 +03:00
Konstantin Knizhnik
eb2323aedd Use walredo from master 2021-12-08 12:27:27 +03:00
Konstantin Knizhnik
cb6e231ea9 Separate checkpointing and page reconstruction 2021-12-06 12:24:13 +03:00
Konstantin Knizhnik
2460f44e30 Fix Clippy warnings' 2021-12-02 14:21:36 +03:00
Konstantin Knizhnik
863c04b0a6 Use latest YAKV version 2021-12-02 13:01:27 +03:00
Konstantin Knizhnik
e4b89a1849 Use parking_lot synchronization primitives 2021-11-30 18:28:35 +03:00
Konstantin Knizhnik
5ad82418a9 Add upload thread 2021-11-24 18:04:48 +03:00
Konstantin Knizhnik
92562145c0 Change toast store API 2021-11-23 11:34:32 +03:00
Konstantin Knizhnik
915001c67e Fix clippy warnings 2021-11-22 20:05:34 +03:00
Konstantin Knizhnik
13f9565ff8 Fix indentation 2021-11-22 19:22:59 +03:00
Konstantin Knizhnik
f73d043a8b Use COW version of YAKV 2021-11-22 19:22:39 +03:00
Konstantin Knizhnik
8fda7a6183 Fix indentation 2021-11-17 12:50:46 +03:00
Konstantin Knizhnik
4acd292717 Use BRIN to optimize GC 2021-11-17 12:50:25 +03:00
Konstantin Knizhnik
b365a075f4 Save materialized pages 2021-11-17 12:16:14 +03:00
Konstantin Knizhnik
6311135d73 Save materialized pages 2021-11-17 12:16:02 +03:00
Konstantin Knizhnik
ee29446edc Add BRIN index for checkpointer 2021-11-12 17:20:17 +03:00
Konstantin Knizhnik
d2e5e0e728 Fix compression 2021-11-11 00:16:37 +03:00
Konstantin Knizhnik
3b471494ff Add import/export functions from buffered stotage to files with layeres 2021-11-10 09:48:28 +03:00
Konstantin Knizhnik
9947de4a2a Fix issues with garbage collector 2021-11-03 12:15:24 +03:00
Konstantin Knizhnik
a3e94e888a Implement garbage collector for buffered repository 2021-10-30 13:10:04 +03:00
Konstantin Knizhnik
e6f33a5cd0 Rewrite TOAST to use the same tree as main index 2021-10-29 17:00:09 +03:00
Konstantin Knizhnik
2dd35b1fbe Fix indentation 2021-10-27 19:37:50 +03:00
Konstantin Knizhnik
ce779cc754 Use delayed commit in buffered_repo 2021-10-27 19:37:23 +03:00
Konstantin Knizhnik
497258c6fe Do not produce error in get_page_at_lsn on missed page 2021-10-26 20:07:11 +03:00
Konstantin Knizhnik
0b6008012d Apply cargo fmt 2021-10-22 19:52:05 +03:00
Konstantin Knizhnik
d35fc20181 Implement buffered repository 2021-10-22 19:50:59 +03:00
26 changed files with 3171 additions and 2541 deletions

76
Cargo.lock generated
View File

@@ -180,9 +180,9 @@ dependencies = [
[[package]]
name = "bitflags"
version = "1.2.1"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitvec"
@@ -338,7 +338,7 @@ dependencies = [
"bytes",
"hex",
"lazy_static",
"nix",
"nix 0.20.0",
"pageserver",
"postgres",
"postgres_ffi",
@@ -886,9 +886,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.101"
version = "0.2.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21"
checksum = "f98a04dce437184842841303488f70d0188c5f51437d2a834dc097eafa909a01"
[[package]]
name = "libloading"
@@ -902,9 +902,9 @@ dependencies = [
[[package]]
name = "lock_api"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0382880606dff6d15c9476c416d18690b72742aa7b605bb6dd6ec9030fbf07eb"
checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
dependencies = [
"scopeguard",
]
@@ -918,6 +918,15 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "lz4_flex"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "177c079243f6867429aca5af5053747f57e329d44f0c58bebca078cd14873ec2"
dependencies = [
"twox-hash",
]
[[package]]
name = "matchers"
version = "0.0.1"
@@ -1043,6 +1052,19 @@ dependencies = [
"libc",
]
[[package]]
name = "nix"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188"
dependencies = [
"bitflags",
"cc",
"cfg-if 1.0.0",
"libc",
"memoffset",
]
[[package]]
name = "nom"
version = "6.1.2"
@@ -1180,6 +1202,9 @@ dependencies = [
"hyper",
"lazy_static",
"log",
"lz4_flex",
"nix 0.23.0",
"parking_lot",
"postgres",
"postgres-protocol",
"postgres-types",
@@ -1198,15 +1223,16 @@ dependencies = [
"toml",
"tracing",
"workspace_hack",
"yakv",
"zenith_metrics",
"zenith_utils",
]
[[package]]
name = "parking_lot"
version = "0.11.1"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
@@ -1215,9 +1241,9 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.8.3"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if 1.0.0",
"instant",
@@ -1908,6 +1934,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stringprep"
version = "0.1.2"
@@ -2228,6 +2260,16 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "twox-hash"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f559b464de2e2bdabcac6a210d12e9b5a5973c251e102c44c585c71d51bd78e"
dependencies = [
"cfg-if 1.0.0",
"static_assertions",
]
[[package]]
name = "typenum"
version = "1.13.0"
@@ -2542,6 +2584,17 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17eba1abb31dda774cd901a9692a47aac716050975a30993c79826a08de47a34"
dependencies = [
"anyhow",
"fs2",
"parking_lot",
]
[[package]]
name = "zenith"
version = "0.1.0"
@@ -2581,6 +2634,7 @@ dependencies = [
"jsonwebtoken",
"lazy_static",
"log",
"nix 0.23.0",
"postgres",
"rand",
"routerify",

View File

@@ -15,3 +15,7 @@ members = [
# This is useful for profiling and, to some extent, debug.
# Besides, debug info should not affect the performance.
debug = true
panic = 'abort'
[profile.dev]
panic = 'abort'

View File

@@ -298,6 +298,7 @@ impl PostgresNode {
conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB");
conf.append("max_wal_size", "100GB");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_sender_timeout", "0");

View File

@@ -37,11 +37,16 @@ async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
nix = "0.23"
#yakv = { path = "../../yakv" }
yakv = "0.2.7"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }
zenith_metrics = { path = "../zenith_metrics" }
zenith_utils = { path = "../zenith_utils" }
workspace_hack = { path = "../workspace_hack" }
parking_lot = "0.11.2"
[dev-dependencies]
hex-literal = "0.3"

View File

@@ -1,25 +0,0 @@
//! Main entry point for the dump_layerfile executable
//!
//! A handy tool for debugging, that's all.
use anyhow::Result;
use clap::{App, Arg};
use pageserver::layered_repository::dump_layerfile_from_path;
use std::path::PathBuf;
fn main() -> Result<()> {
let arg_matches = App::new("Zenith dump_layerfile utility")
.about("Dump contents of one layer file, for debugging")
.arg(
Arg::with_name("path")
.help("Path to file to dump")
.required(true)
.index(1),
)
.get_matches();
let path = PathBuf::from(arg_matches.value_of("path").unwrap());
dump_layerfile_from_path(&path)?;
Ok(())
}

View File

@@ -42,6 +42,9 @@ struct CfgFileParams {
listen_http_addr: Option<String>,
checkpoint_distance: Option<String>,
checkpoint_period: Option<String>,
upload_distance: Option<String>,
upload_period: Option<String>,
reconstruct_threshold: Option<String>,
gc_horizon: Option<String>,
gc_period: Option<String>,
pg_distrib_dir: Option<String>,
@@ -103,6 +106,9 @@ impl CfgFileParams {
listen_http_addr: get_arg("listen-http"),
checkpoint_distance: get_arg("checkpoint_distance"),
checkpoint_period: get_arg("checkpoint_period"),
upload_distance: get_arg("upload_distance"),
upload_period: get_arg("upload_period"),
reconstruct_threshold: get_arg("reconstruct_threshold"),
gc_horizon: get_arg("gc_horizon"),
gc_period: get_arg("gc_period"),
pg_distrib_dir: get_arg("postgres-distrib"),
@@ -121,6 +127,9 @@ impl CfgFileParams {
listen_http_addr: self.listen_http_addr.or(other.listen_http_addr),
checkpoint_distance: self.checkpoint_distance.or(other.checkpoint_distance),
checkpoint_period: self.checkpoint_period.or(other.checkpoint_period),
upload_distance: self.upload_distance.or(other.upload_distance),
upload_period: self.upload_period.or(other.upload_period),
reconstruct_threshold: self.reconstruct_threshold.or(other.reconstruct_threshold),
gc_horizon: self.gc_horizon.or(other.gc_horizon),
gc_period: self.gc_period.or(other.gc_period),
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
@@ -158,6 +167,20 @@ impl CfgFileParams {
None => DEFAULT_CHECKPOINT_PERIOD,
};
let upload_distance: u64 = match self.upload_distance.as_ref() {
Some(upload_distance_str) => upload_distance_str.parse()?,
None => DEFAULT_UPLOAD_DISTANCE,
};
let upload_period = match self.upload_period.as_ref() {
Some(upload_period_str) => humantime::parse_duration(upload_period_str)?,
None => DEFAULT_UPLOAD_PERIOD,
};
let reconstruct_threshold: u64 = match self.reconstruct_threshold.as_ref() {
Some(reconstruct_threshold_str) => reconstruct_threshold_str.parse()?,
None => DEFAULT_RECONSTRUCT_THRESHOLD,
};
let gc_horizon: u64 = match self.gc_horizon.as_ref() {
Some(horizon_str) => horizon_str.parse()?,
None => DEFAULT_GC_HORIZON,
@@ -236,6 +259,9 @@ impl CfgFileParams {
listen_http_addr,
checkpoint_distance,
checkpoint_period,
upload_distance,
upload_period,
reconstruct_threshold,
gc_horizon,
gc_period,
@@ -296,6 +322,24 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Interval between checkpoint iterations"),
)
.arg(
Arg::with_name("upload_distance")
.long("upload_distance")
.takes_value(true)
.help("Distance from current LSN to perform checkpoint of in-memory layers"),
)
.arg(
Arg::with_name("upload_period")
.long("upload_period")
.takes_value(true)
.help("Interval between upload iterations"),
)
.arg(
Arg::with_name("reconstruct_threshold")
.long("reconstruct_threshold")
.takes_value(true)
.help("Minimal size of deltas after which page reconstruction (materialization) can be performed"),
)
.arg(
Arg::with_name("gc_horizon")
.long("gc_horizon")
@@ -600,6 +644,9 @@ mod tests {
listen_http_addr: Some("listen_http_addr_VALUE".to_string()),
checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()),
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
upload_distance: Some("upload_distance_VALUE".to_string()),
upload_period: Some("upload_period_VALUE".to_string()),
reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()),
gc_horizon: Some("gc_horizon_VALUE".to_string()),
gc_period: Some("gc_period_VALUE".to_string()),
pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()),
@@ -623,6 +670,9 @@ mod tests {
listen_http_addr = 'listen_http_addr_VALUE'
checkpoint_distance = 'checkpoint_distance_VALUE'
checkpoint_period = 'checkpoint_period_VALUE'
upload_distance = 'upload_distance_VALUE'
upload_period = 'upload_period_VALUE'
reconstruct_threshold = 'reconstruct_threshold_VALUE'
gc_horizon = 'gc_horizon_VALUE'
gc_period = 'gc_period_VALUE'
pg_distrib_dir = 'pg_distrib_dir_VALUE'
@@ -657,6 +707,9 @@ local_path = 'relish_storage_local_VALUE'
listen_http_addr: Some("listen_http_addr_VALUE".to_string()),
checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()),
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
upload_distance: Some("upload_distance_VALUE".to_string()),
upload_period: Some("upload_period_VALUE".to_string()),
reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()),
gc_horizon: Some("gc_horizon_VALUE".to_string()),
gc_period: Some("gc_period_VALUE".to_string()),
pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()),
@@ -683,6 +736,9 @@ local_path = 'relish_storage_local_VALUE'
listen_http_addr = 'listen_http_addr_VALUE'
checkpoint_distance = 'checkpoint_distance_VALUE'
checkpoint_period = 'checkpoint_period_VALUE'
upload_distance = 'upload_distance_VALUE'
upload_period = 'upload_period_VALUE'
reconstruct_threshold = 'reconstruct_threshold_VALUE'
gc_horizon = 'gc_horizon_VALUE'
gc_period = 'gc_period_VALUE'
pg_distrib_dir = 'pg_distrib_dir_VALUE'

View File

@@ -147,7 +147,7 @@ pub fn create_repo(
let tli = create_timeline(conf, None, &tenantid)?;
let repo = Arc::new(crate::layered_repository::LayeredRepository::new(
let repo = Arc::new(crate::buffered_repository::BufferedRepository::new(
conf,
wal_redo_manager,
tenantid,
@@ -182,6 +182,7 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
let initdb_output = Command::new(initdb_path)
.args(&["-D", initdbpath.to_str().unwrap()])
.args(&["-U", &conf.superuser])
.args(&["-E", "utf8"])
.arg("--no-instructions")
// This is only used for a temporary installation that is deleted shortly after,
// so no need to fsync it

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -39,9 +39,8 @@
//!
use crate::layered_repository::blob::BlobWriter;
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
};
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
use crate::repository::{PageReconstructData, PageReconstructResult, PageVersion};
use crate::waldecoder;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
@@ -148,6 +147,10 @@ pub struct DeltaLayerInner {
}
impl Layer for DeltaLayer {
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}
@@ -201,22 +204,22 @@ impl Layer for DeltaLayer {
for ((_blknum, pv_lsn), blob_range) in iter {
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
if let Some(img) = pv.page_image {
// Found a page image, return it
reconstruct_data.page_img = Some(img);
need_image = false;
break;
} else if let Some(rec) = pv.record {
let will_init = rec.will_init;
reconstruct_data.records.push((*pv_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
match pv {
PageVersion::Page(img) => {
// Found a page image, return it
reconstruct_data.page_img = Some(img);
need_image = false;
break;
}
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
PageVersion::Wal(rec) => {
let will_init = rec.will_init;
reconstruct_data.records.push((*pv_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
@@ -226,7 +229,7 @@ impl Layer for DeltaLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(PageReconstructResult::Continue(self.start_lsn))
Ok(PageReconstructResult::Continue(Lsn(self.start_lsn.0 - 1)))
} else {
Ok(PageReconstructResult::Complete)
}
@@ -307,19 +310,22 @@ impl Layer for DeltaLayer {
let buf = read_blob(&chapter, blob_range)?;
let pv = PageVersion::des(&buf)?;
if let Some(img) = pv.page_image.as_ref() {
write!(&mut desc, " img {} bytes", img.len())?;
}
if let Some(rec) = pv.record.as_ref() {
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
write!(
&mut desc,
" rec {} bytes will_init: {} {}",
rec.rec.len(),
rec.will_init,
wal_desc
)?;
match pv {
PageVersion::Page(img) => {
write!(&mut desc, " img {} bytes", img.len())?;
}
PageVersion::Wal(rec) => {
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
write!(
&mut desc,
" rec {} bytes will_init: {} {}",
rec.rec.len(),
rec.will_init,
wal_desc
)?;
}
}
println!(" blk {} at {}: {}", blk, lsn, desc);
}
@@ -328,6 +334,19 @@ impl Layer for DeltaLayer {
}
impl DeltaLayer {
/// debugging function to print out the contents of the layer
pub fn versions(&self) -> Result<Vec<(u32, Lsn, PageVersion)>> {
let mut versions: Vec<(u32, Lsn, PageVersion)> = Vec::new();
let inner = self.load()?;
let (_path, book) = self.open_book()?;
let chapter = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
for ((blk, lsn), blob_range) in inner.page_version_metas.as_slice() {
let buf = read_blob(&chapter, blob_range)?;
versions.push((*blk, *lsn, PageVersion::des(&buf)?));
}
Ok(versions)
}
fn path_for(
path_or_conf: &PathOrConf,
timelineid: ZTimelineId,
@@ -360,6 +379,7 @@ impl DeltaLayer {
dropped: bool,
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
relsizes: VecMap<Lsn, u32>,
nosync: bool,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
assert!(!relsizes.is_empty());
@@ -432,8 +452,9 @@ impl DeltaLayer {
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
if !nosync {
writer.get_ref().sync_all()?;
}
trace!("saved {}", &path.display());
drop(inner);
@@ -442,12 +463,7 @@ impl DeltaLayer {
}
fn open_book(&self) -> Result<(PathBuf, Book<File>)> {
let path = Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&self.layer_name(),
);
let path = self.path();
let file = File::open(&path)?;
let book = Book::new(file)?;

View File

@@ -13,8 +13,6 @@ use anyhow::Result;
use log::*;
use zenith_utils::lsn::Lsn;
use super::METADATA_FILE_NAME;
// Note: LayeredTimeline::load_layer_map() relies on this sort order
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct DeltaFileName {
@@ -292,7 +290,7 @@ pub fn list_files(
deltafiles.push(deltafilename);
} else if let Some(imgfilename) = ImageFileName::parse_str(fname) {
imgfiles.push(imgfilename);
} else if fname == METADATA_FILE_NAME || fname == "ancestor" || fname.ends_with(".old") {
} else if fname == "metadata" || fname == "ancestor" || fname.ends_with(".old") {
// ignore these
} else {
warn!("unrecognized filename in timeline dir: {}", fname);

View File

@@ -22,11 +22,8 @@
//! For non-blocky relishes, the image can be found in NONBLOCKY_IMAGE_CHAPTER.
//!
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag,
};
use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::RELISH_SEG_SIZE;
use crate::layered_repository::storage_layer::{Layer, SegmentTag, RELISH_SEG_SIZE};
use crate::repository::{PageReconstructData, PageReconstructResult};
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{anyhow, bail, ensure, Result};
@@ -117,6 +114,10 @@ impl Layer for ImageLayer {
PathBuf::from(self.layer_name().to_string())
}
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}
@@ -250,13 +251,14 @@ impl ImageLayer {
}
/// Create a new image file, using the given array of pages.
fn create(
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
seg: SegmentTag,
lsn: Lsn,
base_images: Vec<Bytes>,
nosync: bool,
) -> Result<ImageLayer> {
let image_type = if seg.rel.is_blocky() {
let num_blocks: u32 = base_images.len().try_into()?;
@@ -316,8 +318,9 @@ impl ImageLayer {
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
if !nosync {
writer.get_ref().sync_all()?;
}
trace!("saved {}", path.display());
drop(inner);
@@ -325,6 +328,7 @@ impl ImageLayer {
Ok(layer)
}
/*
// Create a new image file by materializing every page in a source layer
// at given LSN.
pub fn create_from_src(
@@ -362,6 +366,7 @@ impl ImageLayer {
Self::create(conf, timelineid, timeline.tenantid, seg, lsn, base_images)
}
*/
///
/// Load the contents of the file into memory

View File

@@ -3,10 +3,9 @@
//!
use crate::relish::RelishTag;
use crate::repository::WALRecord;
use crate::ZTimelineId;
use crate::repository::{PageReconstructData, PageReconstructResult};
use crate::{ZTenantId, ZTimelineId};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::PathBuf;
@@ -45,56 +44,6 @@ impl SegmentTag {
}
}
///
/// Represents a version of a page at a specific LSN. The LSN is the key of the
/// entry in the 'page_versions' hash, it is not duplicated here.
///
/// A page version can be stored as a full page image, or as WAL record that needs
/// to be applied over the previous page version to reconstruct this version.
///
/// It's also possible to have both a WAL record and a page image in the same
/// PageVersion. That happens if page version is originally stored as a WAL record
/// but it is later reconstructed by a GetPage@LSN request by performing WAL
/// redo. The get_page_at_lsn() code will store the reconstructed pag image next to
/// the WAL record in that case. TODO: That's pretty accidental, not the result
/// of any grand design. If we want to keep reconstructed page versions around, we
/// probably should have a separate buffer cache so that we could control the
/// replacement policy globally. Or if we keep a reconstructed page image, we
/// could throw away the WAL record.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PageVersion {
/// an 8kb page image
pub page_image: Option<Bytes>,
/// WAL record to get from previous page version to this one.
pub record: Option<WALRecord>,
}
///
/// Data needed to reconstruct a page version
///
/// 'page_img' is the old base image of the page to start the WAL replay with.
/// It can be None, if the first WAL record initializes the page (will_init)
/// 'records' contains the records to apply over the base image.
///
pub struct PageReconstructData {
pub records: Vec<(Lsn, WALRecord)>,
pub page_img: Option<Bytes>,
}
/// Return value from Layer::get_page_reconstruct_data
pub enum PageReconstructResult {
/// Got all the data needed to reconstruct the requested page
Complete,
/// This layer didn't contain all the required data, the caller should look up
/// the predecessor layer at the returned LSN and collect more data from there.
Continue(Lsn),
/// This layer didn't contain data needed to reconstruct the page version at
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
Missing(Lsn),
}
///
/// A Layer corresponds to one RELISH_SEG_SIZE slice of a relish in a range of LSNs.
/// There are two kinds of layers, in-memory and on-disk layers. In-memory
@@ -104,6 +53,8 @@ pub enum PageReconstructResult {
/// in-memory and on-disk layers.
///
pub trait Layer: Send + Sync {
fn get_tenant_id(&self) -> ZTenantId;
/// Identify the timeline this relish belongs to
fn get_timeline_id(&self) -> ZTimelineId;

View File

@@ -9,6 +9,7 @@ use zenith_metrics::{register_int_gauge_vec, IntGaugeVec};
pub mod basebackup;
pub mod branches;
pub mod buffered_repository;
pub mod http;
pub mod layered_repository;
pub mod page_service;
@@ -17,6 +18,7 @@ pub mod relish_storage;
pub mod repository;
pub mod restore_local_repo;
pub mod tenant_mgr;
pub mod toast_store;
pub mod waldecoder;
pub mod walreceiver;
pub mod walredo;
@@ -30,14 +32,17 @@ pub mod defaults {
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
// Minimal size of WAL records chain to trigger materialization of the page
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_UPLOAD_DISTANCE: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(3600);
pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0;
pub const DEFAULT_GC_HORIZON: u64 = 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
@@ -64,6 +69,9 @@ pub struct PageServerConf {
// page server crashes.
pub checkpoint_distance: u64,
pub checkpoint_period: Duration,
pub upload_period: Duration,
pub upload_distance: u64,
pub reconstruct_threshold: u64,
pub gc_horizon: u64,
pub gc_period: Duration,
@@ -149,6 +157,9 @@ impl PageServerConf {
daemonize: false,
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_period: Duration::from_secs(10),
upload_distance: defaults::DEFAULT_UPLOAD_DISTANCE,
upload_period: defaults::DEFAULT_UPLOAD_PERIOD,
reconstruct_threshold: defaults::DEFAULT_RECONSTRUCT_THRESHOLD,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),

View File

@@ -693,67 +693,21 @@ impl postgres_backend::Handler for PageServerHandler {
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layer_relfiles_total"),
RowDescriptor::int8_col(b"layer_relfiles_needed_by_cutoff"),
RowDescriptor::int8_col(b"layer_relfiles_needed_by_branches"),
RowDescriptor::int8_col(b"layer_relfiles_not_updated"),
RowDescriptor::int8_col(b"layer_relfiles_needed_as_tombstone"),
RowDescriptor::int8_col(b"layer_relfiles_removed"),
RowDescriptor::int8_col(b"layer_relfiles_dropped"),
RowDescriptor::int8_col(b"layer_nonrelfiles_total"),
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_by_cutoff"),
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_by_branches"),
RowDescriptor::int8_col(b"layer_nonrelfiles_not_updated"),
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_as_tombstone"),
RowDescriptor::int8_col(b"layer_nonrelfiles_removed"),
RowDescriptor::int8_col(b"layer_nonrelfiles_dropped"),
RowDescriptor::int8_col(b"meta_total"),
RowDescriptor::int8_col(b"meta_removed"),
RowDescriptor::int8_col(b"meta_dropped"),
RowDescriptor::int8_col(b"pages_total"),
RowDescriptor::int8_col(b"pages_removed"),
RowDescriptor::int8_col(b"pages_dropped"),
RowDescriptor::int8_col(b"elapsed"),
]))?
.write_message_noflush(&BeMessage::DataRow(&[
Some(result.ondisk_relfiles_total.to_string().as_bytes()),
Some(
result
.ondisk_relfiles_needed_by_cutoff
.to_string()
.as_bytes(),
),
Some(
result
.ondisk_relfiles_needed_by_branches
.to_string()
.as_bytes(),
),
Some(result.ondisk_relfiles_not_updated.to_string().as_bytes()),
Some(
result
.ondisk_relfiles_needed_as_tombstone
.to_string()
.as_bytes(),
),
Some(result.ondisk_relfiles_removed.to_string().as_bytes()),
Some(result.ondisk_relfiles_dropped.to_string().as_bytes()),
Some(result.ondisk_nonrelfiles_total.to_string().as_bytes()),
Some(
result
.ondisk_nonrelfiles_needed_by_cutoff
.to_string()
.as_bytes(),
),
Some(
result
.ondisk_nonrelfiles_needed_by_branches
.to_string()
.as_bytes(),
),
Some(result.ondisk_nonrelfiles_not_updated.to_string().as_bytes()),
Some(
result
.ondisk_nonrelfiles_needed_as_tombstone
.to_string()
.as_bytes(),
),
Some(result.ondisk_nonrelfiles_removed.to_string().as_bytes()),
Some(result.ondisk_nonrelfiles_dropped.to_string().as_bytes()),
Some(result.meta_total.to_string().as_bytes()),
Some(result.meta_removed.to_string().as_bytes()),
Some(result.meta_dropped.to_string().as_bytes()),
Some(result.pages_total.to_string().as_bytes()),
Some(result.pages_removed.to_string().as_bytes()),
Some(result.pages_dropped.to_string().as_bytes()),
Some(result.elapsed.as_millis().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;

View File

@@ -44,45 +44,27 @@ pub trait Repository: Send + Sync {
///
/// Result of performing GC
///
#[derive(Default)]
#[derive(Default, Debug)]
pub struct GcResult {
pub ondisk_relfiles_total: u64,
pub ondisk_relfiles_needed_by_cutoff: u64,
pub ondisk_relfiles_needed_by_branches: u64,
pub ondisk_relfiles_not_updated: u64,
pub ondisk_relfiles_needed_as_tombstone: u64,
pub ondisk_relfiles_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
pub ondisk_relfiles_dropped: u64, // # of layer files removed because the relation was dropped
pub meta_removed: u64, // removed versions beyond PITR interval for which new page image exists
pub meta_dropped: u64, // removed versions beyond PITR interval of dropped relations
pub meta_total: u64, // total number of metaobject version histories
pub ondisk_nonrelfiles_total: u64,
pub ondisk_nonrelfiles_needed_by_cutoff: u64,
pub ondisk_nonrelfiles_needed_by_branches: u64,
pub ondisk_nonrelfiles_not_updated: u64,
pub ondisk_nonrelfiles_needed_as_tombstone: u64,
pub ondisk_nonrelfiles_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
pub ondisk_nonrelfiles_dropped: u64, // # of layer files removed because the relation was dropped
pub pages_removed: u64, // removed versions beyond PITR interval for which new page image exists
pub pages_dropped: u64, // removed versions beyond PITR interval of dropped relations
pub pages_total: u64, // total number of page vaersion histories
pub elapsed: Duration,
}
impl AddAssign for GcResult {
fn add_assign(&mut self, other: Self) {
self.ondisk_relfiles_total += other.ondisk_relfiles_total;
self.ondisk_relfiles_needed_by_cutoff += other.ondisk_relfiles_needed_by_cutoff;
self.ondisk_relfiles_needed_by_branches += other.ondisk_relfiles_needed_by_branches;
self.ondisk_relfiles_not_updated += other.ondisk_relfiles_not_updated;
self.ondisk_relfiles_needed_as_tombstone += other.ondisk_relfiles_needed_as_tombstone;
self.ondisk_relfiles_removed += other.ondisk_relfiles_removed;
self.ondisk_relfiles_dropped += other.ondisk_relfiles_dropped;
self.ondisk_nonrelfiles_total += other.ondisk_nonrelfiles_total;
self.ondisk_nonrelfiles_needed_by_cutoff += other.ondisk_nonrelfiles_needed_by_cutoff;
self.ondisk_nonrelfiles_needed_by_branches += other.ondisk_nonrelfiles_needed_by_branches;
self.ondisk_nonrelfiles_not_updated += other.ondisk_nonrelfiles_not_updated;
self.ondisk_nonrelfiles_needed_as_tombstone += other.ondisk_nonrelfiles_needed_as_tombstone;
self.ondisk_nonrelfiles_removed += other.ondisk_nonrelfiles_removed;
self.ondisk_nonrelfiles_dropped += other.ondisk_nonrelfiles_dropped;
self.meta_total += other.meta_total;
self.meta_removed += other.meta_removed;
self.meta_dropped += other.meta_dropped;
self.pages_total += other.pages_total;
self.pages_removed += other.pages_removed;
self.pages_dropped += other.pages_dropped;
self.elapsed += other.elapsed;
}
}
@@ -111,14 +93,18 @@ pub trait Timeline: Send + Sync {
/// Get a list of all existing relations
/// Pass RelTag to get relation objects or None to get nonrels.
fn list_relishes(&self, tag: Option<RelTag>, lsn: Lsn) -> Result<HashSet<RelishTag>>;
/// Get a list of all existing relations in given tablespace and database.
fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelishTag>>;
/// Get a list of all existing non-relational objects
fn list_nonrels(&self, lsn: Lsn) -> Result<HashSet<RelishTag>>;
///
/// Export data as delats and image layers between 'start_lsn' to 'end_lsn'. The
/// start is inclusive, and end is exclusive.
///
fn export_timeline(&self, start_lsn: Lsn, end_lsn: Lsn) -> Result<()>;
/// Get the LSN where this branch was created
fn get_ancestor_lsn(&self) -> Lsn;
@@ -181,6 +167,16 @@ pub trait TimelineWriter: Deref<Target = dyn Timeline> {
/// Advance requires aligned LSN as an argument and would wake wait_lsn() callers.
/// Previous last record LSN is stored alongside the latest and can be read.
fn advance_last_record_lsn(&self, lsn: Lsn);
///
/// Complete all delayed commits and advance disk_consistent_lsn
///
fn checkpoint(&self) -> Result<()>;
///
/// Import data from layer files
///
fn import_timeline(&self, snapshot_lsn: Lsn) -> Result<()>;
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -213,6 +209,39 @@ impl WALRecord {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PageVersion {
/// an 8kb page image
Page(Bytes),
/// WAL record to get from previous page version to this one.
Wal(WALRecord),
}
///
/// Data needed to reconstruct a page version
///
/// 'page_img' is the old base image of the page to start the WAL replay with.
/// It can be None, if the first WAL record initializes the page (will_init)
/// 'records' contains the records to apply over the base image.
///
pub struct PageReconstructData {
pub records: Vec<(Lsn, WALRecord)>,
pub page_img: Option<Bytes>,
}
/// Return value from Layer::get_page_reconstruct_data
pub enum PageReconstructResult {
/// Got all the data needed to reconstruct the requested page
Complete,
/// This layer didn't contain all the required data, the caller should look up
/// the predecessor layer at the returned LSN and collect more data from there.
Continue(Lsn),
/// This layer didn't contain data needed to reconstruct the page version at
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
Missing(Lsn),
}
///
/// Tests that should work the same with any Repository/Timeline implementation.
///
@@ -220,7 +249,7 @@ impl WALRecord {
#[cfg(test)]
mod tests {
use super::*;
use crate::layered_repository::{LayeredRepository, METADATA_FILE_NAME};
use crate::buffered_repository::{BufferedRepository, METADATA_FILE_NAME};
use crate::walredo::{WalRedoError, WalRedoManager};
use crate::PageServerConf;
use hex_literal::hex;
@@ -296,7 +325,7 @@ mod tests {
fn load(&self) -> Box<dyn Repository> {
let walredo_mgr = Arc::new(TestRedoManager);
Box::new(LayeredRepository::new(
Box::new(BufferedRepository::new(
self.conf,
walredo_mgr,
self.tenant_id,

View File

@@ -11,7 +11,7 @@ use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail, Result};
use bytes::{Buf, Bytes};
use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::relish::*;
@@ -126,7 +126,6 @@ pub fn import_timeline_from_postgres_datadir(
import_nonrel_file(writer, lsn, RelishTag::TwoPhase { xid }, &entry.path())?;
}
// TODO: Scan pg_tblspc
writer.advance_last_record_lsn(lsn);
// Import WAL. This is needed even when starting from a shutdown checkpoint, because
@@ -140,6 +139,7 @@ pub fn import_timeline_from_postgres_datadir(
lsn,
&mut pg_control.checkPointCopy.clone(),
)?;
writer.checkpoint()?;
Ok(())
}
@@ -416,7 +416,6 @@ pub fn save_decoded_record(
if checkpoint.update_next_xid(decoded.xl_xid) {
*checkpoint_modified = true;
}
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
@@ -427,13 +426,43 @@ pub fn save_decoded_record(
forknum: blk.forknum as u8,
});
let rec = WALRecord {
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
//
// Instead of storing full-page-image WAL record,
// it is better to store extracted image: we can skip wal-redo
// in this case. Also some FPI records may contain multiple (up to 32) pages,
// so them have to be copied multiple times.
//
if blk.apply_image
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0
{
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize);
image.extend_from_slice(&recdata[img_offs..img_offs + img_len]);
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
if blk.hole_length != 0 {
let tail = image.split_off(blk.hole_offset as usize);
image.resize(image.len() + blk.hole_length as usize, 0u8);
image.unsplit(tail);
}
image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
timeline.put_page_image(tag, blk.blkno, lsn, image.freeze())?;
} else {
let rec = WALRecord {
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
}
}
let mut buf = decoded.record.clone();

View File

@@ -2,7 +2,7 @@
//! page server.
use crate::branches;
use crate::layered_repository::LayeredRepository;
use crate::buffered_repository::BufferedRepository;
use crate::repository::{Repository, Timeline};
use crate::walredo::PostgresRedoManager;
use crate::PageServerConf;
@@ -62,6 +62,7 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
struct TenantHandleEntry {
checkpointer_handle: Option<JoinHandle<()>>,
uploader_handle: Option<JoinHandle<()>>,
gc_handle: Option<JoinHandle<()>>,
}
@@ -98,20 +99,22 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Set up an object repository, for actual data storage.
let repo = Arc::new(LayeredRepository::new(
let repo = Arc::new(BufferedRepository::new(
conf,
Arc::new(walredo_mgr),
tenant_id,
true,
));
let checkpointer_handle = LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
let gc_handle = LayeredRepository::launch_gc_thread(conf, repo.clone());
let checkpointer_handle = BufferedRepository::launch_checkpointer_thread(conf, repo.clone());
let gc_handle = BufferedRepository::launch_gc_thread(conf, repo.clone());
let uploader_handle = BufferedRepository::launch_upload_thread(conf, repo.clone());
let mut handles = TENANT_HANDLES.lock().unwrap();
let h = TenantHandleEntry {
checkpointer_handle: Some(checkpointer_handle),
gc_handle: Some(gc_handle),
uploader_handle: Some(uploader_handle),
};
handles.insert(tenant_id, h);
@@ -170,6 +173,8 @@ pub fn stop_tenant_threads(tenantid: ZTenantId) {
if let Some(h) = handles.get_mut(&tenantid) {
h.checkpointer_handle.take().map(JoinHandle::join);
debug!("checkpointer for tenant {} has stopped", tenantid);
h.uploader_handle.take().map(JoinHandle::join);
debug!("uploader for tenant {} has stopped", tenantid);
h.gc_handle.take().map(JoinHandle::join);
debug!("gc for tenant {} has stopped", tenantid);
}

View File

@@ -0,0 +1,268 @@
use anyhow::{anyhow, Result};
use lz4_flex;
use std::convert::TryInto;
use std::ops::{Bound, RangeBounds};
use std::path::Path;
use yakv::storage::{
Key,
Select,
// ReadOnlyTransaction,
Snapshot,
Storage,
StorageConfig,
StorageIterator,
Transaction,
Value,
};
const TOAST_SEGMENT_SIZE: usize = 2000;
const CACHE_SIZE: usize = 1024; // 8Mb
///
/// Toast storage consistof two KV databases: one for storing main index
/// and second for storing sliced BLOB (values larger than 2kb).
/// BLOBs and main data are stored in different databases to improve
/// data locality and reduce key size for TOAST segments.
///
pub struct ToastStore {
db: Storage, // key-value database
}
pub struct ToastIterator<'a> {
iter: StorageIterator<'a>,
}
pub struct ToastSnapshot<'a> {
// tx: ReadOnlyTransaction<'a>,
tx: Snapshot<'a>,
}
impl<'a> ToastSnapshot<'a> {
pub fn range<R: RangeBounds<Key>>(&self, range: R) -> ToastIterator<'_> {
let from = match range.start_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8; 4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8; 4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded,
};
let till = match range.end_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8; 4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8; 4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded,
};
ToastIterator {
iter: self.tx.range((from, till)),
}
}
pub fn iter(&self) -> ToastIterator<'_> {
self.range(..)
}
}
impl<'a> Iterator for ToastIterator<'a> {
type Item = Result<(Key, Value)>;
fn next(&mut self) -> Option<Self::Item> {
let mut toast: Option<Vec<u8>> = None;
let mut next_segno = 0u16;
for elem in &mut self.iter {
let res = if let Ok((key, value)) = elem {
let key_len = key.len();
let n_segments =
u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
let segno = u16::from_be_bytes(key[key_len - 2..].try_into().unwrap());
let key = key[..key_len - 4].to_vec();
if n_segments != 0 {
// TOAST
assert_eq!(segno, next_segno);
if next_segno == 0 {
toast = Some(Vec::with_capacity(n_segments as usize * TOAST_SEGMENT_SIZE))
}
toast.as_mut().unwrap().extend_from_slice(&value);
next_segno = segno + 1;
if next_segno != n_segments {
continue;
}
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
}
} else {
Ok((key, value))
}
} else {
elem
};
return Some(res);
}
assert_eq!(next_segno, 0);
None
}
}
impl<'a> DoubleEndedIterator for ToastIterator<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
let mut toast: Option<Vec<u8>> = None;
let mut next_segno = 0u16;
while let Some(elem) = self.iter.next_back() {
if let Ok((key, value)) = elem {
assert!(!value.is_empty());
let key_len = key.len();
let n_segments =
u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
let segno = u16::from_be_bytes(key[key_len - 2..].try_into().unwrap());
let key = key[..key_len - 4].to_vec();
if n_segments != 0 {
// TOAST
assert!(segno + 1 == next_segno || next_segno == 0);
if next_segno == 0 {
let len = (n_segments - 1) as usize * TOAST_SEGMENT_SIZE + value.len();
let mut vec = vec![0u8; len];
vec[len - value.len()..].copy_from_slice(&value);
toast = Some(vec);
} else {
toast.as_mut().unwrap()[segno as usize * TOAST_SEGMENT_SIZE
..(segno + 1) as usize * TOAST_SEGMENT_SIZE]
.copy_from_slice(&value);
}
next_segno = segno;
if next_segno == 0 {
let toast = toast.unwrap();
assert!(!toast.is_empty());
let res = lz4_flex::decompress_size_prepended(&toast);
return Some(if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
});
}
} else {
return Some(Ok((key, value)));
}
} else {
return Some(elem);
}
}
assert_eq!(next_segno, 0);
None
}
}
//
// FIXME-KK: not using WAL now. Implement asynchronous or delayed commit.
//
impl ToastStore {
pub fn new(path: &Path) -> Result<ToastStore> {
Ok(ToastStore {
db: Storage::open(
&path.join("pageserver.db"),
StorageConfig {
cache_size: CACHE_SIZE,
nosync: false,
},
)?,
})
}
pub fn put(&self, key: Key, value: Value) -> Result<()> {
let mut tx = self.db.start_transaction();
self.tx_remove(&mut tx, &key)?;
let value_len = value.len();
let mut key = key;
if value_len >= TOAST_SEGMENT_SIZE {
let compressed_data = lz4_flex::compress_prepend_size(&value);
let compressed_data_len = compressed_data.len();
let mut offs: usize = 0;
let mut segno = 0u16;
let n_segments =
((compressed_data_len + TOAST_SEGMENT_SIZE - 1) / TOAST_SEGMENT_SIZE) as u16;
assert!(n_segments != 0);
key.extend_from_slice(&n_segments.to_be_bytes());
key.extend_from_slice(&[0u8; 2]);
let key_len = key.len();
while offs + TOAST_SEGMENT_SIZE < compressed_data_len {
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(
&key,
&compressed_data[offs..offs + TOAST_SEGMENT_SIZE].to_vec(),
)?;
offs += TOAST_SEGMENT_SIZE;
segno += 1;
}
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(&key, &compressed_data[offs..].to_vec())?;
} else {
key.extend_from_slice(&[0u8; 4]);
tx.put(&key, &value)?;
}
tx.subcommit()?;
//tx.delay();
Ok(())
}
pub fn commit(&self) -> Result<()> {
let tx = self.db.start_transaction();
tx.commit()?;
Ok(())
}
pub fn take_snapshot(&self) -> ToastSnapshot<'_> {
ToastSnapshot {
//tx: self.db.read_only_transaction(),
tx: self.db.take_snapshot(),
}
}
pub fn remove(&self, key: Key) -> Result<()> {
let mut tx = self.db.start_transaction();
self.tx_remove(&mut tx, &key)?;
tx.subcommit()?;
//tx.delay();
Ok(())
}
pub fn tx_remove(&self, tx: &mut Transaction, key: &[u8]) -> Result<()> {
let mut min_key = key.to_vec();
let mut max_key = key.to_vec();
min_key.extend_from_slice(&[0u8; 4]);
max_key.extend_from_slice(&[0xFFu8; 4]);
let mut iter = tx.range(&min_key..&max_key);
if let Some(entry) = iter.next() {
let mut key = entry?.0;
let key_len = key.len();
let n_segments = u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
if n_segments != 0 {
// TOAST
for i in 0..n_segments {
key[key_len - 2..].copy_from_slice(&i.to_be_bytes());
tx.remove(&key)?;
}
} else {
tx.remove(&key)?;
}
}
Ok(())
}
pub fn size(&self) -> u64 {
self.db.get_database_info().db_used
}
}

View File

@@ -229,17 +229,18 @@ pub struct DecodedBkpBlock {
pub blkno: u32,
/* copy of the fork_flags field from the XLogRecordBlockHeader */
flags: u8,
pub flags: u8,
/* Information on full-page image, if any */
has_image: bool, /* has image, even for consistency checking */
pub has_image: bool, /* has image, even for consistency checking */
pub apply_image: bool, /* has image that should be restored */
pub will_init: bool, /* record doesn't need previous page version to apply */
//char *bkp_image;
hole_offset: u16,
hole_length: u16,
bimg_len: u16,
bimg_info: u8,
pub hole_offset: u16,
pub hole_length: u16,
pub bimg_offset: u32,
pub bimg_len: u16,
pub bimg_info: u8,
/* Buffer holding the rmgr-specific data associated with this block */
has_data: bool,
@@ -859,8 +860,19 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
// 3. Decode blocks.
let mut ptr = record.len() - buf.remaining();
for blk in blocks.iter_mut() {
if blk.has_image {
blk.bimg_offset = ptr as u32;
ptr += blk.bimg_len as usize;
}
if blk.has_data {
ptr += blk.data_len as usize;
}
}
// We don't need them, so just skip blocks_total_len bytes
buf.advance(blocks_total_len as usize);
assert_eq!(ptr, record.len() - buf.remaining());
let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;

View File

@@ -22,23 +22,26 @@ use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use log::*;
use nix::poll::*;
use serde::Serialize;
use std::fs;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::Error;
use std::io::{Error, ErrorKind};
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{ChildStdin, ChildStdout, Command};
use tokio::time::timeout;
use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::nonblock::set_nonblock;
use zenith_utils::zid::ZTenantId;
use crate::relish::*;
@@ -53,6 +56,10 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::XLogRecord;
const N_CHANNELS: usize = 16;
const CHANNEL_SIZE: usize = 1024 * 1024;
type ChannelId = usize;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
///
@@ -130,17 +137,22 @@ lazy_static! {
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the processs at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
/// perform WAL replay. I multiplex requests from multiple threads
/// using `sender` channel and send them to the postgres wal-redo process
/// pipe by separate thread. Responses are returned through set of `receivers`
/// channels, used in round robin manner. Receiver thread is protected by mutex
/// to prevent it's usage by more than one thread
/// In the future, we might want to launch a pool of processes to allow concurrent
/// replay of multiple records.
///
pub struct PostgresRedoManager {
tenantid: ZTenantId,
conf: &'static PageServerConf,
runtime: tokio::runtime::Runtime,
process: Mutex<Option<PostgresRedoProcess>>,
// mutiplexor pipe: use sync_channel to allow sharing sender by multiple threads
// and limit size of buffer
sender: SyncSender<(ChannelId, Vec<u8>)>,
// set of receiver channels
receivers: Vec<Mutex<Receiver<Bytes>>>,
// atomicly incremented counter for choosing receiver
round_robin: AtomicUsize,
}
#[derive(Debug)]
@@ -153,6 +165,13 @@ struct WalRedoRequest {
records: Vec<(Lsn, WALRecord)>,
}
impl WalRedoRequest {
// Can this request be served by zenith redo funcitons
// or we need to pass it to wal-redo postgres process?
fn can_apply_in_zenith(&self) -> bool {
!matches!(self.rel, RelishTag::Relation(_))
}
}
/// An error happened in WAL redo
#[derive(Debug, thiserror::Error)]
pub enum WalRedoError {
@@ -161,6 +180,8 @@ pub enum WalRedoError {
#[error("cannot perform WAL redo now")]
InvalidState,
#[error("cannot perform WAL redo for this request")]
InvalidRequest,
}
///
@@ -181,10 +202,6 @@ impl WalRedoManager for PostgresRedoManager {
base_img: Option<Bytes>,
records: Vec<(Lsn, WALRecord)>,
) -> Result<Bytes, WalRedoError> {
let start_time;
let lock_time;
let end_time;
let request = WalRedoRequest {
rel,
blknum,
@@ -192,28 +209,14 @@ impl WalRedoManager for PostgresRedoManager {
base_img,
records,
};
start_time = Instant::now();
let result = {
let mut process_guard = self.process.lock().unwrap();
lock_time = Instant::now();
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = self
.runtime
.block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
self.runtime
.block_on(self.handle_apply_request(process, &request))
let start_time = Instant::now();
let result = if request.can_apply_in_zenith() {
self.handle_apply_request_zenith(&request)
} else {
self.handle_apply_request_postgres(&request)
};
end_time = Instant::now();
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
WAL_REDO_TIME.observe(end_time.duration_since(lock_time).as_secs_f64());
let end_time = Instant::now();
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
result
}
@@ -223,32 +226,109 @@ impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
///
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
pub fn new(conf: &PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
let (tx, rx): (
SyncSender<(ChannelId, Vec<u8>)>,
Receiver<(ChannelId, Vec<u8>)>,
) = mpsc::sync_channel(CHANNEL_SIZE);
let mut senders: Vec<Sender<Bytes>> = Vec::with_capacity(N_CHANNELS);
let mut receivers: Vec<Mutex<Receiver<Bytes>>> = Vec::with_capacity(N_CHANNELS);
for _ in 0..N_CHANNELS {
let (tx, rx) = mpsc::channel();
senders.push(tx);
receivers.push(Mutex::new(rx));
}
if let Ok(mut proc) = PostgresRedoProcess::launch(conf, &tenantid) {
let _proxy = std::thread::spawn(move || loop {
let (id, data) = rx.recv().unwrap();
match proc.apply_wal_records(data) {
Ok(page) => senders[id as usize].send(page).unwrap(),
Err(err) => {
info!("wal-redo failed with error {:?}", err);
proc.kill();
break;
}
}
});
PostgresRedoManager {
sender: tx,
receivers,
round_robin: AtomicUsize::new(0),
}
} else {
panic!("Failed to launch wal-redo postgres");
}
}
// The actual process is launched lazily, on first request.
PostgresRedoManager {
runtime,
tenantid,
conf,
process: Mutex::new(None),
fn apply_wal_records(
&self,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, WALRecord)],
) -> Result<Bytes, std::io::Error> {
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
let mut writebuf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
build_apply_record_msg(*lsn, &rec.rec, &mut writebuf);
}
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let id = self.round_robin.fetch_add(1, Ordering::Relaxed) % N_CHANNELS;
let rx = self.receivers[id].lock().unwrap();
self.sender.send((id, writebuf)).unwrap();
Ok(rx.recv().unwrap())
}
///
/// Process one request for WAL redo using wal-redo postgres
///
fn handle_apply_request_postgres(
&self,
request: &WalRedoRequest,
) -> Result<Bytes, WalRedoError> {
let blknum = request.blknum;
let lsn = request.lsn;
let base_img = request.base_img.clone();
let records = &request.records;
let nrecords = records.len();
let start = Instant::now();
let apply_result: Result<Bytes, Error>;
if let RelishTag::Relation(rel) = request.rel {
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
apply_result = self.apply_wal_records(buf_tag, base_img, records);
let duration = start.elapsed();
debug!(
"postgres applied {} WAL records in {} us to reconstruct page image at LSN {}",
nrecords,
duration.as_micros(),
lsn
);
apply_result.map_err(WalRedoError::IoError)
} else {
Err(WalRedoError::InvalidRequest)
}
}
///
/// Process one request for WAL redo.
/// Process one request for WAL redo using custom zenith code
///
async fn handle_apply_request(
&self,
process: &mut PostgresRedoProcess,
request: &WalRedoRequest,
) -> Result<Bytes, WalRedoError> {
fn handle_apply_request_zenith(&self, request: &WalRedoRequest) -> Result<Bytes, WalRedoError> {
let rel = request.rel;
let blknum = request.blknum;
let lsn = request.lsn;
@@ -260,178 +340,158 @@ impl PostgresRedoManager {
let start = Instant::now();
let apply_result: Result<Bytes, Error>;
if let RelishTag::Relation(rel) = rel {
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
apply_result = process.apply_wal_records(buf_tag, base_img, records).await;
// Non-relational WAL records are handled here, with custom code that has the
// same effects as the corresponding Postgres WAL redo function.
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let mut page = BytesMut::new();
if let Some(fpi) = base_img {
// If full-page image is provided, then use it...
page.extend_from_slice(&fpi[..]);
} else {
// Non-relational WAL records are handled here, with custom code that has the
// same effects as the corresponding Postgres WAL redo function.
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let mut page = BytesMut::new();
if let Some(fpi) = base_img {
// If full-page image is provided, then use it...
page.extend_from_slice(&fpi[..]);
} else {
// otherwise initialize page with zeros
page.extend_from_slice(&ZERO_PAGE);
// otherwise initialize page with zeros
page.extend_from_slice(&ZERO_PAGE);
}
// Apply all collected WAL records
for (_lsn, record) in records {
let mut buf = record.rec.clone();
WAL_REDO_RECORD_COUNTER.inc();
// 1. Parse XLogRecord struct
// FIXME: refactor to avoid code duplication.
let xlogrec = XLogRecord::from_bytes(&mut buf);
//move to main data
// TODO probably, we should store some records in our special format
// to avoid this weird parsing on replay
let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize;
if buf.remaining() > skip {
buf.advance(skip);
}
// Apply all collected WAL records
for (_lsn, record) in records {
let mut buf = record.rec.clone();
WAL_REDO_RECORD_COUNTER.inc();
// 1. Parse XLogRecord struct
// FIXME: refactor to avoid code duplication.
let xlogrec = XLogRecord::from_bytes(&mut buf);
//move to main data
// TODO probably, we should store some records in our special format
// to avoid this weird parsing on replay
let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize;
if buf.remaining() > skip {
buf.advance(skip);
}
if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
// Transaction manager stuff
let rec_segno = match rel {
RelishTag::Slru { slru, segno } => {
assert!(
slru == SlruKind::Clog,
"Not valid XACT relish tag {:?}",
rel
if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
// Transaction manager stuff
let rec_segno = match rel {
RelishTag::Slru { slru, segno } => {
assert!(
slru == SlruKind::Clog,
"Not valid XACT relish tag {:?}",
rel
);
segno
}
_ => panic!("Not valid XACT relish tag {:?}", rel),
};
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info);
if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT
|| parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED
{
transaction_id_set_status(
parsed_xact.xid,
pg_constants::TRANSACTION_STATUS_COMMITTED,
&mut page,
);
for subxact in &parsed_xact.subxacts {
let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
// only update xids on the requested page
if rec_segno == segno && blknum == rpageno {
transaction_id_set_status(
*subxact,
pg_constants::TRANSACTION_STATUS_COMMITTED,
&mut page,
);
segno
}
_ => panic!("Not valid XACT relish tag {:?}", rel),
};
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info);
if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT
|| parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED
{
transaction_id_set_status(
parsed_xact.xid,
pg_constants::TRANSACTION_STATUS_COMMITTED,
&mut page,
);
for subxact in &parsed_xact.subxacts {
let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
// only update xids on the requested page
if rec_segno == segno && blknum == rpageno {
transaction_id_set_status(
*subxact,
pg_constants::TRANSACTION_STATUS_COMMITTED,
&mut page,
);
}
}
} else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT
|| parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED
{
transaction_id_set_status(
parsed_xact.xid,
pg_constants::TRANSACTION_STATUS_ABORTED,
&mut page,
);
for subxact in &parsed_xact.subxacts {
let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
// only update xids on the requested page
if rec_segno == segno && blknum == rpageno {
transaction_id_set_status(
*subxact,
pg_constants::TRANSACTION_STATUS_ABORTED,
&mut page,
);
}
}
}
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
// Multixact operations
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
if let RelishTag::Slru {
slru,
segno: rec_segno,
} = rel
{
if slru == SlruKind::MultiXactMembers {
for i in 0..xlrec.nmembers {
let pageno =
i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
if segno == rec_segno && rpageno == blknum {
// update only target block
let offset = xlrec.moff + i;
let memberoff = mx_offset_to_member_offset(offset);
let flagsoff = mx_offset_to_flags_offset(offset);
let bshift = mx_offset_to_flags_bitshift(offset);
let mut flagsval =
LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
flagsval &= !(((1
<< pg_constants::MXACT_MEMBER_BITS_PER_XACT)
- 1)
} else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT
|| parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED
{
transaction_id_set_status(
parsed_xact.xid,
pg_constants::TRANSACTION_STATUS_ABORTED,
&mut page,
);
for subxact in &parsed_xact.subxacts {
let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
// only update xids on the requested page
if rec_segno == segno && blknum == rpageno {
transaction_id_set_status(
*subxact,
pg_constants::TRANSACTION_STATUS_ABORTED,
&mut page,
);
}
}
}
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
// Multixact operations
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
if let RelishTag::Slru {
slru,
segno: rec_segno,
} = rel
{
if slru == SlruKind::MultiXactMembers {
for i in 0..xlrec.nmembers {
let pageno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
if segno == rec_segno && rpageno == blknum {
// update only target block
let offset = xlrec.moff + i;
let memberoff = mx_offset_to_member_offset(offset);
let flagsoff = mx_offset_to_flags_offset(offset);
let bshift = mx_offset_to_flags_bitshift(offset);
let mut flagsval =
LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
flagsval &=
!(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1)
<< bshift);
flagsval |= xlrec.members[i as usize].status << bshift;
LittleEndian::write_u32(
&mut page[flagsoff..flagsoff + 4],
flagsval,
);
LittleEndian::write_u32(
&mut page[memberoff..memberoff + 4],
xlrec.members[i as usize].xid,
);
}
flagsval |= xlrec.members[i as usize].status << bshift;
LittleEndian::write_u32(
&mut page[flagsoff..flagsoff + 4],
flagsval,
);
LittleEndian::write_u32(
&mut page[memberoff..memberoff + 4],
xlrec.members[i as usize].xid,
);
}
} else {
// Multixact offsets SLRU
let offs = (xlrec.mid
% pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32
* 4) as usize;
LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff);
}
} else {
panic!();
// Multixact offsets SLRU
let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32
* 4) as usize;
LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff);
}
} else {
panic!();
}
} else {
panic!();
}
}
apply_result = Ok::<Bytes, Error>(page.freeze());
}
apply_result = Ok::<Bytes, Error>(page.freeze());
let duration = start.elapsed();
let result: Result<Bytes, WalRedoError>;
debug!(
"applied {} WAL records in {} ms to reconstruct page image at LSN {}",
"zenith applied {} WAL records in {} ms to reconstruct page image at LSN {}",
nrecords,
duration.as_millis(),
lsn
);
if let Err(e) = apply_result {
error!("could not apply WAL records: {:#}", e);
result = Err(WalRedoError::IoError(e));
} else {
let img = apply_result.unwrap();
result = Ok(img);
}
// The caller is responsible for sending the response
result
apply_result.map_err(WalRedoError::IoError)
}
}
@@ -439,18 +499,17 @@ impl PostgresRedoManager {
/// Handle to the Postgres WAL redo process
///
struct PostgresRedoProcess {
child: Child,
stdin: ChildStdin,
stdout: ChildStdout,
stderr: ChildStderr,
}
impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
async fn launch(
conf: &PageServerConf,
tenantid: &ZTenantId,
) -> Result<PostgresRedoProcess, Error> {
fn launch(conf: &PageServerConf, tenantid: &ZTenantId) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
@@ -471,7 +530,6 @@ impl PostgresRedoProcess {
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.output()
.await
.expect("failed to execute initdb");
if !initdb.status.success() {
@@ -508,102 +566,114 @@ impl PostgresRedoProcess {
datadir.display()
);
let stdin = child.stdin.take().expect("failed to open child's stdin");
let stderr = child.stderr.take().expect("failed to open child's stderr");
let stdout = child.stdout.take().expect("failed to open child's stdout");
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
// This async block reads the child's stderr, and forwards it to the logger
let f_stderr = async {
let mut stderr_buffered = tokio::io::BufReader::new(stderr);
set_nonblock(stdin.as_raw_fd())?;
set_nonblock(stdout.as_raw_fd())?;
set_nonblock(stderr.as_raw_fd())?;
let mut line = String::new();
loop {
let res = stderr_buffered.read_line(&mut line).await;
if res.is_err() {
debug!("could not convert line to utf-8");
continue;
}
if res.unwrap() == 0 {
break;
}
error!("wal-redo-postgres: {}", line.trim());
line.clear();
}
Ok::<(), Error>(())
};
tokio::spawn(f_stderr);
Ok(PostgresRedoProcess {
child,
stdin,
stdout,
stderr,
})
}
Ok(PostgresRedoProcess { stdin, stdout })
fn kill(mut self) {
let _ = self.child.kill();
if let Ok(exit_status) = self.child.wait() {
error!("wal-redo-postgres exited with code {}", exit_status);
}
drop(self);
}
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
async fn apply_wal_records(
&mut self,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, WALRecord)],
) -> Result<Bytes, std::io::Error> {
let stdout = &mut self.stdout;
// Buffer the writes to avoid a lot of small syscalls.
let mut stdin = tokio::io::BufWriter::new(&mut self.stdin);
fn apply_wal_records(&mut self, writebuf: Vec<u8>) -> Result<Bytes, std::io::Error> {
let mut nwrite = self.stdin.write(&writebuf)?;
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; pg_constants::BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
];
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
//
// 'f_stdin' handles writing the base image and WAL records to the child process.
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
// tokio runtime in the 'launch' function already, forwards the logging.
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
timeout(
TIMEOUT,
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
)
.await??;
if let Some(img) = base_img {
timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??;
while nresult < pg_constants::BLCKSZ.into() {
// If we have more data to write, wake up if 'stdin' becomes writeable or
// we have data to read. Otherwise only wake up if there's data to read.
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
let n = nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// Send WAL records.
for (lsn, rec) in records.iter() {
WAL_REDO_RECORD_COUNTER.inc();
// If we have some messages in stderr, forward them to the log.
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let n = self.stderr.read(&mut errbuf)?;
stdin
.write_all(&build_apply_record_msg(*lsn, &rec.rec))
.await?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if n > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..n])
);
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
// To make sure we capture all log from the process if it fails, keep
// reading from the stderr, before checking the stdout.
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
// Send GetPage command to get the result back
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
timeout(TIMEOUT, stdin.flush()).await??;
//debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(())
};
// If we have more data to write and 'stdin' is writeable, do write.
if nwrite < writebuf.len() {
let in_revents = pollfds[2].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += self.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
}
}
// Read back new page image
let f_stdout = async {
let mut buf = [0u8; 8192];
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += self.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
//debug!("got response for {}", tag.blknum);
Ok::<[u8; 8192], Error>(buf)
};
let res = tokio::try_join!(f_stdout, f_stdin)?;
let buf = res.0;
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
Ok(Bytes::from(resultbuf))
}
}
@@ -611,62 +681,42 @@ impl PostgresRedoProcess {
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
// explanation of the protocol.
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec<u8> {
fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
let len = 4 + 1 + 4 * 4;
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'B');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len);
buf
}
fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec<u8> {
fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
assert!(base_img.len() == 8192);
let len = 4 + 1 + 4 * 4 + base_img.len();
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'P');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
buf.put(base_img);
debug_assert!(buf.len() == 1 + len);
buf
}
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec<u8> {
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
let len = 4 + 8 + rec.len();
let mut buf: Vec<u8> = Vec::with_capacity(1 + len);
buf.put_u8(b'A');
buf.put_u32(len as u32);
buf.put_u64(endlsn.0);
buf.put(rec);
debug_assert!(buf.len() == 1 + len);
buf
}
fn build_get_page_msg(tag: BufferTag) -> Vec<u8> {
fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
let len = 4 + 1 + 4 * 4;
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'G');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len);
buf
}

View File

@@ -27,6 +27,7 @@ workspace_hack = { path = "../workspace_hack" }
rand = "0.8.3"
jsonwebtoken = "7"
hex = { version = "0.4.3", features = ["serde"] }
nix = "0.23.0"
rustls = "0.19.1"
rustls-split = "0.2.1"

View File

@@ -40,3 +40,7 @@ pub mod logging;
// Misc
pub mod accum;
// Utility for putting a raw file descriptor into non-blocking mode
pub mod nonblock;

View File

@@ -0,0 +1,17 @@
use nix::fcntl::{fcntl, OFlag, F_GETFL, F_SETFL};
use std::os::unix::io::RawFd;
/// Put a file descriptor into non-blocking mode
pub fn set_nonblock(fd: RawFd) -> Result<(), std::io::Error> {
let bits = fcntl(fd, F_GETFL)?;
// Safety: If F_GETFL returns some unknown bits, they should be valid
// for passing back to F_SETFL, too. If we left them out, the F_SETFL
// would effectively clear them, which is not what we want.
let mut flags = unsafe { OFlag::from_bits_unchecked(bits) };
flags |= OFlag::O_NONBLOCK;
fcntl(fd, F_SETFL(flags))?;
Ok(())
}

View File

@@ -87,6 +87,10 @@ impl<K: Ord, V> VecMap<K, V> {
Ok(None)
}
pub fn last(&self) -> Option<&(K, V)> {
self.0.last()
}
/// Split the map into two.
///
/// The left map contains everything before `cutoff` (exclusive).