mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-18 10:52:55 +00:00
Compare commits
21 Commits
hackathon/
...
buffered_r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e08ad485a | ||
|
|
5ad82418a9 | ||
|
|
92562145c0 | ||
|
|
915001c67e | ||
|
|
13f9565ff8 | ||
|
|
f73d043a8b | ||
|
|
8fda7a6183 | ||
|
|
4acd292717 | ||
|
|
b365a075f4 | ||
|
|
6311135d73 | ||
|
|
ee29446edc | ||
|
|
d2e5e0e728 | ||
|
|
3b471494ff | ||
|
|
9947de4a2a | ||
|
|
a3e94e888a | ||
|
|
e6f33a5cd0 | ||
|
|
2dd35b1fbe | ||
|
|
ce779cc754 | ||
|
|
497258c6fe | ||
|
|
0b6008012d | ||
|
|
d35fc20181 |
35
Cargo.lock
generated
35
Cargo.lock
generated
@@ -918,6 +918,15 @@ dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lz4_flex"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "177c079243f6867429aca5af5053747f57e329d44f0c58bebca078cd14873ec2"
|
||||
dependencies = [
|
||||
"twox-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.0.1"
|
||||
@@ -1180,6 +1189,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"lz4_flex",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
@@ -1198,6 +1208,7 @@ dependencies = [
|
||||
"toml",
|
||||
"tracing",
|
||||
"workspace_hack",
|
||||
"yakv",
|
||||
"zenith_metrics",
|
||||
"zenith_utils",
|
||||
]
|
||||
@@ -1908,6 +1919,12 @@ version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||
|
||||
[[package]]
|
||||
name = "static_assertions"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
|
||||
|
||||
[[package]]
|
||||
name = "stringprep"
|
||||
version = "0.1.2"
|
||||
@@ -2228,6 +2245,16 @@ version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
|
||||
|
||||
[[package]]
|
||||
name = "twox-hash"
|
||||
version = "1.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f559b464de2e2bdabcac6a210d12e9b5a5973c251e102c44c585c71d51bd78e"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "typenum"
|
||||
version = "1.13.0"
|
||||
@@ -2542,6 +2569,14 @@ version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
|
||||
|
||||
[[package]]
|
||||
name = "yakv"
|
||||
version = "0.2.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"fs2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zenith"
|
||||
version = "0.1.0"
|
||||
|
||||
@@ -15,3 +15,7 @@ members = [
|
||||
# This is useful for profiling and, to some extent, debug.
|
||||
# Besides, debug info should not affect the performance.
|
||||
debug = true
|
||||
panic = 'abort'
|
||||
|
||||
[profile.dev]
|
||||
panic = 'abort'
|
||||
|
||||
@@ -298,6 +298,7 @@ impl PostgresNode {
|
||||
conf.append("max_replication_slots", "10");
|
||||
conf.append("hot_standby", "on");
|
||||
conf.append("shared_buffers", "1MB");
|
||||
conf.append("max_wal_size", "100GB");
|
||||
conf.append("fsync", "off");
|
||||
conf.append("max_connections", "100");
|
||||
conf.append("wal_sender_timeout", "0");
|
||||
|
||||
@@ -37,6 +37,9 @@ async-trait = "0.1"
|
||||
const_format = "0.2.21"
|
||||
tracing = "0.1.27"
|
||||
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
|
||||
#yakv = { path = "../../yakv" }
|
||||
yakv = "0.2.4"
|
||||
lz4_flex = "0.9.0"
|
||||
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
zenith_metrics = { path = "../zenith_metrics" }
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
//! Main entry point for the dump_layerfile executable
|
||||
//!
|
||||
//! A handy tool for debugging, that's all.
|
||||
use anyhow::Result;
|
||||
use clap::{App, Arg};
|
||||
use pageserver::layered_repository::dump_layerfile_from_path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let arg_matches = App::new("Zenith dump_layerfile utility")
|
||||
.about("Dump contents of one layer file, for debugging")
|
||||
.arg(
|
||||
Arg::with_name("path")
|
||||
.help("Path to file to dump")
|
||||
.required(true)
|
||||
.index(1),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let path = PathBuf::from(arg_matches.value_of("path").unwrap());
|
||||
|
||||
dump_layerfile_from_path(&path)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -42,6 +42,9 @@ struct CfgFileParams {
|
||||
listen_http_addr: Option<String>,
|
||||
checkpoint_distance: Option<String>,
|
||||
checkpoint_period: Option<String>,
|
||||
upload_distance: Option<String>,
|
||||
upload_period: Option<String>,
|
||||
reconstruct_threshold: Option<String>,
|
||||
gc_horizon: Option<String>,
|
||||
gc_period: Option<String>,
|
||||
pg_distrib_dir: Option<String>,
|
||||
@@ -103,6 +106,9 @@ impl CfgFileParams {
|
||||
listen_http_addr: get_arg("listen-http"),
|
||||
checkpoint_distance: get_arg("checkpoint_distance"),
|
||||
checkpoint_period: get_arg("checkpoint_period"),
|
||||
upload_distance: get_arg("upload_distance"),
|
||||
upload_period: get_arg("upload_period"),
|
||||
reconstruct_threshold: get_arg("reconstruct_threshold"),
|
||||
gc_horizon: get_arg("gc_horizon"),
|
||||
gc_period: get_arg("gc_period"),
|
||||
pg_distrib_dir: get_arg("postgres-distrib"),
|
||||
@@ -121,6 +127,9 @@ impl CfgFileParams {
|
||||
listen_http_addr: self.listen_http_addr.or(other.listen_http_addr),
|
||||
checkpoint_distance: self.checkpoint_distance.or(other.checkpoint_distance),
|
||||
checkpoint_period: self.checkpoint_period.or(other.checkpoint_period),
|
||||
upload_distance: self.upload_distance.or(other.upload_distance),
|
||||
upload_period: self.upload_period.or(other.upload_period),
|
||||
reconstruct_threshold: self.reconstruct_threshold.or(other.reconstruct_threshold),
|
||||
gc_horizon: self.gc_horizon.or(other.gc_horizon),
|
||||
gc_period: self.gc_period.or(other.gc_period),
|
||||
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
|
||||
@@ -158,6 +167,20 @@ impl CfgFileParams {
|
||||
None => DEFAULT_CHECKPOINT_PERIOD,
|
||||
};
|
||||
|
||||
let upload_distance: u64 = match self.upload_distance.as_ref() {
|
||||
Some(upload_distance_str) => upload_distance_str.parse()?,
|
||||
None => DEFAULT_UPLOAD_DISTANCE,
|
||||
};
|
||||
let upload_period = match self.upload_period.as_ref() {
|
||||
Some(upload_period_str) => humantime::parse_duration(upload_period_str)?,
|
||||
None => DEFAULT_UPLOAD_PERIOD,
|
||||
};
|
||||
|
||||
let reconstruct_threshold: u64 = match self.reconstruct_threshold.as_ref() {
|
||||
Some(reconstruct_threshold_str) => reconstruct_threshold_str.parse()?,
|
||||
None => DEFAULT_RECONSTRUCT_THRESHOLD,
|
||||
};
|
||||
|
||||
let gc_horizon: u64 = match self.gc_horizon.as_ref() {
|
||||
Some(horizon_str) => horizon_str.parse()?,
|
||||
None => DEFAULT_GC_HORIZON,
|
||||
@@ -236,6 +259,9 @@ impl CfgFileParams {
|
||||
listen_http_addr,
|
||||
checkpoint_distance,
|
||||
checkpoint_period,
|
||||
upload_distance,
|
||||
upload_period,
|
||||
reconstruct_threshold,
|
||||
gc_horizon,
|
||||
gc_period,
|
||||
|
||||
@@ -296,6 +322,24 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.help("Interval between checkpoint iterations"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("checkpoint_distance")
|
||||
.long("checkpoint_distance")
|
||||
.takes_value(true)
|
||||
.help("Distance from current LSN to perform checkpoint of in-memory layers"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("upload_period")
|
||||
.long("upload_period")
|
||||
.takes_value(true)
|
||||
.help("Interval between upload iterations"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("reconstruct_threshold")
|
||||
.long("reconstruct_threshold")
|
||||
.takes_value(true)
|
||||
.help("Minimal size of deltas after which page reconstruction (materialization) can be performed"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("gc_horizon")
|
||||
.long("gc_horizon")
|
||||
@@ -600,6 +644,9 @@ mod tests {
|
||||
listen_http_addr: Some("listen_http_addr_VALUE".to_string()),
|
||||
checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()),
|
||||
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
|
||||
upload_distance: Some("upload_distance_VALUE".to_string()),
|
||||
upload_period: Some("upload_period_VALUE".to_string()),
|
||||
reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()),
|
||||
gc_horizon: Some("gc_horizon_VALUE".to_string()),
|
||||
gc_period: Some("gc_period_VALUE".to_string()),
|
||||
pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()),
|
||||
@@ -623,6 +670,9 @@ mod tests {
|
||||
listen_http_addr = 'listen_http_addr_VALUE'
|
||||
checkpoint_distance = 'checkpoint_distance_VALUE'
|
||||
checkpoint_period = 'checkpoint_period_VALUE'
|
||||
upload_distance = 'upload_distance_VALUE'
|
||||
upload_period = 'upload_period_VALUE'
|
||||
reconstruct_threshold = 'reconstruct_threshold_VALUE'
|
||||
gc_horizon = 'gc_horizon_VALUE'
|
||||
gc_period = 'gc_period_VALUE'
|
||||
pg_distrib_dir = 'pg_distrib_dir_VALUE'
|
||||
@@ -657,6 +707,9 @@ local_path = 'relish_storage_local_VALUE'
|
||||
listen_http_addr: Some("listen_http_addr_VALUE".to_string()),
|
||||
checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()),
|
||||
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
|
||||
upload_distance: Some("upload_distance_VALUE".to_string()),
|
||||
upload_period: Some("upload_period_VALUE".to_string()),
|
||||
reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()),
|
||||
gc_horizon: Some("gc_horizon_VALUE".to_string()),
|
||||
gc_period: Some("gc_period_VALUE".to_string()),
|
||||
pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()),
|
||||
@@ -683,6 +736,9 @@ local_path = 'relish_storage_local_VALUE'
|
||||
listen_http_addr = 'listen_http_addr_VALUE'
|
||||
checkpoint_distance = 'checkpoint_distance_VALUE'
|
||||
checkpoint_period = 'checkpoint_period_VALUE'
|
||||
upload_distance = 'upload_distance_VALUE'
|
||||
upload_period = 'upload_period_VALUE'
|
||||
reconstruct_threshold = 'reconstruct_threshold_VALUE'
|
||||
gc_horizon = 'gc_horizon_VALUE'
|
||||
gc_period = 'gc_period_VALUE'
|
||||
pg_distrib_dir = 'pg_distrib_dir_VALUE'
|
||||
|
||||
@@ -147,7 +147,7 @@ pub fn create_repo(
|
||||
|
||||
let tli = create_timeline(conf, None, &tenantid)?;
|
||||
|
||||
let repo = Arc::new(crate::layered_repository::LayeredRepository::new(
|
||||
let repo = Arc::new(crate::buffered_repository::BufferedRepository::new(
|
||||
conf,
|
||||
wal_redo_manager,
|
||||
tenantid,
|
||||
|
||||
2129
pageserver/src/buffered_repository.rs
Normal file
2129
pageserver/src/buffered_repository.rs
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -39,9 +39,8 @@
|
||||
//!
|
||||
use crate::layered_repository::blob::BlobWriter;
|
||||
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
|
||||
};
|
||||
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
|
||||
use crate::repository::{PageReconstructData, PageReconstructResult, PageVersion};
|
||||
use crate::waldecoder;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
@@ -148,6 +147,10 @@ pub struct DeltaLayerInner {
|
||||
}
|
||||
|
||||
impl Layer for DeltaLayer {
|
||||
fn get_tenant_id(&self) -> ZTenantId {
|
||||
self.tenantid
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
self.timelineid
|
||||
}
|
||||
@@ -201,22 +204,22 @@ impl Layer for DeltaLayer {
|
||||
for ((_blknum, pv_lsn), blob_range) in iter {
|
||||
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
|
||||
|
||||
if let Some(img) = pv.page_image {
|
||||
// Found a page image, return it
|
||||
reconstruct_data.page_img = Some(img);
|
||||
need_image = false;
|
||||
break;
|
||||
} else if let Some(rec) = pv.record {
|
||||
let will_init = rec.will_init;
|
||||
reconstruct_data.records.push((*pv_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
match pv {
|
||||
PageVersion::Page(img) => {
|
||||
// Found a page image, return it
|
||||
reconstruct_data.page_img = Some(img);
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// No base image, and no WAL record. Huh?
|
||||
bail!("no page image or WAL record for requested page");
|
||||
PageVersion::Wal(rec) => {
|
||||
let will_init = rec.will_init;
|
||||
reconstruct_data.records.push((*pv_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,7 +229,7 @@ impl Layer for DeltaLayer {
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok(PageReconstructResult::Continue(self.start_lsn))
|
||||
Ok(PageReconstructResult::Continue(Lsn(self.start_lsn.0 - 1)))
|
||||
} else {
|
||||
Ok(PageReconstructResult::Complete)
|
||||
}
|
||||
@@ -307,19 +310,22 @@ impl Layer for DeltaLayer {
|
||||
let buf = read_blob(&chapter, blob_range)?;
|
||||
let pv = PageVersion::des(&buf)?;
|
||||
|
||||
if let Some(img) = pv.page_image.as_ref() {
|
||||
write!(&mut desc, " img {} bytes", img.len())?;
|
||||
}
|
||||
if let Some(rec) = pv.record.as_ref() {
|
||||
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
|
||||
write!(
|
||||
&mut desc,
|
||||
" rec {} bytes will_init: {} {}",
|
||||
rec.rec.len(),
|
||||
rec.will_init,
|
||||
wal_desc
|
||||
)?;
|
||||
match pv {
|
||||
PageVersion::Page(img) => {
|
||||
write!(&mut desc, " img {} bytes", img.len())?;
|
||||
}
|
||||
PageVersion::Wal(rec) => {
|
||||
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
|
||||
write!(
|
||||
&mut desc,
|
||||
" rec {} bytes will_init: {} {}",
|
||||
rec.rec.len(),
|
||||
rec.will_init,
|
||||
wal_desc
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
println!(" blk {} at {}: {}", blk, lsn, desc);
|
||||
}
|
||||
|
||||
@@ -328,6 +334,19 @@ impl Layer for DeltaLayer {
|
||||
}
|
||||
|
||||
impl DeltaLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
pub fn versions(&self) -> Result<Vec<(u32, Lsn, PageVersion)>> {
|
||||
let mut versions: Vec<(u32, Lsn, PageVersion)> = Vec::new();
|
||||
let inner = self.load()?;
|
||||
let (_path, book) = self.open_book()?;
|
||||
let chapter = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
|
||||
for ((blk, lsn), blob_range) in inner.page_version_metas.as_slice() {
|
||||
let buf = read_blob(&chapter, blob_range)?;
|
||||
versions.push((*blk, *lsn, PageVersion::des(&buf)?));
|
||||
}
|
||||
Ok(versions)
|
||||
}
|
||||
|
||||
fn path_for(
|
||||
path_or_conf: &PathOrConf,
|
||||
timelineid: ZTimelineId,
|
||||
@@ -360,6 +379,7 @@ impl DeltaLayer {
|
||||
dropped: bool,
|
||||
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
|
||||
relsizes: VecMap<Lsn, u32>,
|
||||
nosync: bool,
|
||||
) -> Result<DeltaLayer> {
|
||||
if seg.rel.is_blocky() {
|
||||
assert!(!relsizes.is_empty());
|
||||
@@ -432,8 +452,9 @@ impl DeltaLayer {
|
||||
|
||||
// This flushes the underlying 'buf_writer'.
|
||||
let writer = book.close()?;
|
||||
writer.get_ref().sync_all()?;
|
||||
|
||||
if !nosync {
|
||||
writer.get_ref().sync_all()?;
|
||||
}
|
||||
trace!("saved {}", &path.display());
|
||||
|
||||
drop(inner);
|
||||
@@ -442,12 +463,7 @@ impl DeltaLayer {
|
||||
}
|
||||
|
||||
fn open_book(&self) -> Result<(PathBuf, Book<File>)> {
|
||||
let path = Self::path_for(
|
||||
&self.path_or_conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&self.layer_name(),
|
||||
);
|
||||
let path = self.path();
|
||||
|
||||
let file = File::open(&path)?;
|
||||
let book = Book::new(file)?;
|
||||
|
||||
@@ -13,8 +13,6 @@ use anyhow::Result;
|
||||
use log::*;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use super::METADATA_FILE_NAME;
|
||||
|
||||
// Note: LayeredTimeline::load_layer_map() relies on this sort order
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
|
||||
pub struct DeltaFileName {
|
||||
@@ -292,7 +290,7 @@ pub fn list_files(
|
||||
deltafiles.push(deltafilename);
|
||||
} else if let Some(imgfilename) = ImageFileName::parse_str(fname) {
|
||||
imgfiles.push(imgfilename);
|
||||
} else if fname == METADATA_FILE_NAME || fname == "ancestor" || fname.ends_with(".old") {
|
||||
} else if fname == "metadata" || fname == "ancestor" || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
} else {
|
||||
warn!("unrecognized filename in timeline dir: {}", fname);
|
||||
|
||||
@@ -22,11 +22,8 @@
|
||||
//! For non-blocky relishes, the image can be found in NONBLOCKY_IMAGE_CHAPTER.
|
||||
//!
|
||||
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageReconstructResult, SegmentTag,
|
||||
};
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::layered_repository::RELISH_SEG_SIZE;
|
||||
use crate::layered_repository::storage_layer::{Layer, SegmentTag, RELISH_SEG_SIZE};
|
||||
use crate::repository::{PageReconstructData, PageReconstructResult};
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{anyhow, bail, ensure, Result};
|
||||
@@ -117,6 +114,10 @@ impl Layer for ImageLayer {
|
||||
PathBuf::from(self.layer_name().to_string())
|
||||
}
|
||||
|
||||
fn get_tenant_id(&self) -> ZTenantId {
|
||||
self.tenantid
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
self.timelineid
|
||||
}
|
||||
@@ -250,13 +251,14 @@ impl ImageLayer {
|
||||
}
|
||||
|
||||
/// Create a new image file, using the given array of pages.
|
||||
fn create(
|
||||
pub fn create(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
seg: SegmentTag,
|
||||
lsn: Lsn,
|
||||
base_images: Vec<Bytes>,
|
||||
nosync: bool,
|
||||
) -> Result<ImageLayer> {
|
||||
let image_type = if seg.rel.is_blocky() {
|
||||
let num_blocks: u32 = base_images.len().try_into()?;
|
||||
@@ -316,8 +318,9 @@ impl ImageLayer {
|
||||
|
||||
// This flushes the underlying 'buf_writer'.
|
||||
let writer = book.close()?;
|
||||
writer.get_ref().sync_all()?;
|
||||
|
||||
if !nosync {
|
||||
writer.get_ref().sync_all()?;
|
||||
}
|
||||
trace!("saved {}", path.display());
|
||||
|
||||
drop(inner);
|
||||
@@ -325,6 +328,7 @@ impl ImageLayer {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
/*
|
||||
// Create a new image file by materializing every page in a source layer
|
||||
// at given LSN.
|
||||
pub fn create_from_src(
|
||||
@@ -362,6 +366,7 @@ impl ImageLayer {
|
||||
|
||||
Self::create(conf, timelineid, timeline.tenantid, seg, lsn, base_images)
|
||||
}
|
||||
*/
|
||||
|
||||
///
|
||||
/// Load the contents of the file into memory
|
||||
|
||||
@@ -3,10 +3,9 @@
|
||||
//!
|
||||
|
||||
use crate::relish::RelishTag;
|
||||
use crate::repository::WALRecord;
|
||||
use crate::ZTimelineId;
|
||||
use crate::repository::{PageReconstructData, PageReconstructResult};
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
@@ -45,56 +44,6 @@ impl SegmentTag {
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Represents a version of a page at a specific LSN. The LSN is the key of the
|
||||
/// entry in the 'page_versions' hash, it is not duplicated here.
|
||||
///
|
||||
/// A page version can be stored as a full page image, or as WAL record that needs
|
||||
/// to be applied over the previous page version to reconstruct this version.
|
||||
///
|
||||
/// It's also possible to have both a WAL record and a page image in the same
|
||||
/// PageVersion. That happens if page version is originally stored as a WAL record
|
||||
/// but it is later reconstructed by a GetPage@LSN request by performing WAL
|
||||
/// redo. The get_page_at_lsn() code will store the reconstructed pag image next to
|
||||
/// the WAL record in that case. TODO: That's pretty accidental, not the result
|
||||
/// of any grand design. If we want to keep reconstructed page versions around, we
|
||||
/// probably should have a separate buffer cache so that we could control the
|
||||
/// replacement policy globally. Or if we keep a reconstructed page image, we
|
||||
/// could throw away the WAL record.
|
||||
///
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PageVersion {
|
||||
/// an 8kb page image
|
||||
pub page_image: Option<Bytes>,
|
||||
/// WAL record to get from previous page version to this one.
|
||||
pub record: Option<WALRecord>,
|
||||
}
|
||||
|
||||
///
|
||||
/// Data needed to reconstruct a page version
|
||||
///
|
||||
/// 'page_img' is the old base image of the page to start the WAL replay with.
|
||||
/// It can be None, if the first WAL record initializes the page (will_init)
|
||||
/// 'records' contains the records to apply over the base image.
|
||||
///
|
||||
pub struct PageReconstructData {
|
||||
pub records: Vec<(Lsn, WALRecord)>,
|
||||
pub page_img: Option<Bytes>,
|
||||
}
|
||||
|
||||
/// Return value from Layer::get_page_reconstruct_data
|
||||
pub enum PageReconstructResult {
|
||||
/// Got all the data needed to reconstruct the requested page
|
||||
Complete,
|
||||
/// This layer didn't contain all the required data, the caller should look up
|
||||
/// the predecessor layer at the returned LSN and collect more data from there.
|
||||
Continue(Lsn),
|
||||
/// This layer didn't contain data needed to reconstruct the page version at
|
||||
/// the returned LSN. This is usually considered an error, but might be OK
|
||||
/// in some circumstances.
|
||||
Missing(Lsn),
|
||||
}
|
||||
|
||||
///
|
||||
/// A Layer corresponds to one RELISH_SEG_SIZE slice of a relish in a range of LSNs.
|
||||
/// There are two kinds of layers, in-memory and on-disk layers. In-memory
|
||||
@@ -104,6 +53,8 @@ pub enum PageReconstructResult {
|
||||
/// in-memory and on-disk layers.
|
||||
///
|
||||
pub trait Layer: Send + Sync {
|
||||
fn get_tenant_id(&self) -> ZTenantId;
|
||||
|
||||
/// Identify the timeline this relish belongs to
|
||||
fn get_timeline_id(&self) -> ZTimelineId;
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ use zenith_metrics::{register_int_gauge_vec, IntGaugeVec};
|
||||
|
||||
pub mod basebackup;
|
||||
pub mod branches;
|
||||
pub mod buffered_repository;
|
||||
pub mod http;
|
||||
pub mod layered_repository;
|
||||
pub mod page_service;
|
||||
@@ -17,6 +18,7 @@ pub mod relish_storage;
|
||||
pub mod repository;
|
||||
pub mod restore_local_repo;
|
||||
pub mod tenant_mgr;
|
||||
pub mod toast_store;
|
||||
pub mod waldecoder;
|
||||
pub mod walreceiver;
|
||||
pub mod walredo;
|
||||
@@ -30,14 +32,17 @@ pub mod defaults {
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
|
||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||
// which is good for now to trigger bugs.
|
||||
// Minimal size of WAL records chain to trigger materialization of the page
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
|
||||
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
|
||||
pub const DEFAULT_UPLOAD_DISTANCE: u64 = 1024 * 1024 * 1024;
|
||||
pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(2500);
|
||||
|
||||
pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0;
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 1024;
|
||||
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
|
||||
|
||||
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
|
||||
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
|
||||
@@ -64,6 +69,9 @@ pub struct PageServerConf {
|
||||
// page server crashes.
|
||||
pub checkpoint_distance: u64,
|
||||
pub checkpoint_period: Duration,
|
||||
pub upload_period: Duration,
|
||||
pub upload_distance: u64,
|
||||
pub reconstruct_threshold: u64,
|
||||
|
||||
pub gc_horizon: u64,
|
||||
pub gc_period: Duration,
|
||||
@@ -149,6 +157,9 @@ impl PageServerConf {
|
||||
daemonize: false,
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_period: Duration::from_secs(10),
|
||||
upload_distance: defaults::DEFAULT_UPLOAD_DISTANCE,
|
||||
upload_period: defaults::DEFAULT_UPLOAD_PERIOD,
|
||||
reconstruct_threshold: defaults::DEFAULT_RECONSTRUCT_THRESHOLD,
|
||||
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
||||
gc_period: Duration::from_secs(10),
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
|
||||
@@ -693,67 +693,21 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"layer_relfiles_total"),
|
||||
RowDescriptor::int8_col(b"layer_relfiles_needed_by_cutoff"),
|
||||
RowDescriptor::int8_col(b"layer_relfiles_needed_by_branches"),
|
||||
RowDescriptor::int8_col(b"layer_relfiles_not_updated"),
|
||||
RowDescriptor::int8_col(b"layer_relfiles_needed_as_tombstone"),
|
||||
RowDescriptor::int8_col(b"layer_relfiles_removed"),
|
||||
RowDescriptor::int8_col(b"layer_relfiles_dropped"),
|
||||
RowDescriptor::int8_col(b"layer_nonrelfiles_total"),
|
||||
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_by_cutoff"),
|
||||
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_by_branches"),
|
||||
RowDescriptor::int8_col(b"layer_nonrelfiles_not_updated"),
|
||||
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_as_tombstone"),
|
||||
RowDescriptor::int8_col(b"layer_nonrelfiles_removed"),
|
||||
RowDescriptor::int8_col(b"layer_nonrelfiles_dropped"),
|
||||
RowDescriptor::int8_col(b"meta_total"),
|
||||
RowDescriptor::int8_col(b"meta_removed"),
|
||||
RowDescriptor::int8_col(b"meta_dropped"),
|
||||
RowDescriptor::int8_col(b"pages_total"),
|
||||
RowDescriptor::int8_col(b"pages_removed"),
|
||||
RowDescriptor::int8_col(b"pages_dropped"),
|
||||
RowDescriptor::int8_col(b"elapsed"),
|
||||
]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[
|
||||
Some(result.ondisk_relfiles_total.to_string().as_bytes()),
|
||||
Some(
|
||||
result
|
||||
.ondisk_relfiles_needed_by_cutoff
|
||||
.to_string()
|
||||
.as_bytes(),
|
||||
),
|
||||
Some(
|
||||
result
|
||||
.ondisk_relfiles_needed_by_branches
|
||||
.to_string()
|
||||
.as_bytes(),
|
||||
),
|
||||
Some(result.ondisk_relfiles_not_updated.to_string().as_bytes()),
|
||||
Some(
|
||||
result
|
||||
.ondisk_relfiles_needed_as_tombstone
|
||||
.to_string()
|
||||
.as_bytes(),
|
||||
),
|
||||
Some(result.ondisk_relfiles_removed.to_string().as_bytes()),
|
||||
Some(result.ondisk_relfiles_dropped.to_string().as_bytes()),
|
||||
Some(result.ondisk_nonrelfiles_total.to_string().as_bytes()),
|
||||
Some(
|
||||
result
|
||||
.ondisk_nonrelfiles_needed_by_cutoff
|
||||
.to_string()
|
||||
.as_bytes(),
|
||||
),
|
||||
Some(
|
||||
result
|
||||
.ondisk_nonrelfiles_needed_by_branches
|
||||
.to_string()
|
||||
.as_bytes(),
|
||||
),
|
||||
Some(result.ondisk_nonrelfiles_not_updated.to_string().as_bytes()),
|
||||
Some(
|
||||
result
|
||||
.ondisk_nonrelfiles_needed_as_tombstone
|
||||
.to_string()
|
||||
.as_bytes(),
|
||||
),
|
||||
Some(result.ondisk_nonrelfiles_removed.to_string().as_bytes()),
|
||||
Some(result.ondisk_nonrelfiles_dropped.to_string().as_bytes()),
|
||||
Some(result.meta_total.to_string().as_bytes()),
|
||||
Some(result.meta_removed.to_string().as_bytes()),
|
||||
Some(result.meta_dropped.to_string().as_bytes()),
|
||||
Some(result.pages_total.to_string().as_bytes()),
|
||||
Some(result.pages_removed.to_string().as_bytes()),
|
||||
Some(result.pages_dropped.to_string().as_bytes()),
|
||||
Some(result.elapsed.as_millis().to_string().as_bytes()),
|
||||
]))?
|
||||
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
|
||||
@@ -44,45 +44,27 @@ pub trait Repository: Send + Sync {
|
||||
///
|
||||
/// Result of performing GC
|
||||
///
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct GcResult {
|
||||
pub ondisk_relfiles_total: u64,
|
||||
pub ondisk_relfiles_needed_by_cutoff: u64,
|
||||
pub ondisk_relfiles_needed_by_branches: u64,
|
||||
pub ondisk_relfiles_not_updated: u64,
|
||||
pub ondisk_relfiles_needed_as_tombstone: u64,
|
||||
pub ondisk_relfiles_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
|
||||
pub ondisk_relfiles_dropped: u64, // # of layer files removed because the relation was dropped
|
||||
pub meta_removed: u64, // removed versions beyond PITR interval for which new page image exists
|
||||
pub meta_dropped: u64, // removed versions beyond PITR interval of dropped relations
|
||||
pub meta_total: u64, // total number of metaobject version histories
|
||||
|
||||
pub ondisk_nonrelfiles_total: u64,
|
||||
pub ondisk_nonrelfiles_needed_by_cutoff: u64,
|
||||
pub ondisk_nonrelfiles_needed_by_branches: u64,
|
||||
pub ondisk_nonrelfiles_not_updated: u64,
|
||||
pub ondisk_nonrelfiles_needed_as_tombstone: u64,
|
||||
pub ondisk_nonrelfiles_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
|
||||
pub ondisk_nonrelfiles_dropped: u64, // # of layer files removed because the relation was dropped
|
||||
pub pages_removed: u64, // removed versions beyond PITR interval for which new page image exists
|
||||
pub pages_dropped: u64, // removed versions beyond PITR interval of dropped relations
|
||||
pub pages_total: u64, // total number of page vaersion histories
|
||||
|
||||
pub elapsed: Duration,
|
||||
}
|
||||
|
||||
impl AddAssign for GcResult {
|
||||
fn add_assign(&mut self, other: Self) {
|
||||
self.ondisk_relfiles_total += other.ondisk_relfiles_total;
|
||||
self.ondisk_relfiles_needed_by_cutoff += other.ondisk_relfiles_needed_by_cutoff;
|
||||
self.ondisk_relfiles_needed_by_branches += other.ondisk_relfiles_needed_by_branches;
|
||||
self.ondisk_relfiles_not_updated += other.ondisk_relfiles_not_updated;
|
||||
self.ondisk_relfiles_needed_as_tombstone += other.ondisk_relfiles_needed_as_tombstone;
|
||||
self.ondisk_relfiles_removed += other.ondisk_relfiles_removed;
|
||||
self.ondisk_relfiles_dropped += other.ondisk_relfiles_dropped;
|
||||
|
||||
self.ondisk_nonrelfiles_total += other.ondisk_nonrelfiles_total;
|
||||
self.ondisk_nonrelfiles_needed_by_cutoff += other.ondisk_nonrelfiles_needed_by_cutoff;
|
||||
self.ondisk_nonrelfiles_needed_by_branches += other.ondisk_nonrelfiles_needed_by_branches;
|
||||
self.ondisk_nonrelfiles_not_updated += other.ondisk_nonrelfiles_not_updated;
|
||||
self.ondisk_nonrelfiles_needed_as_tombstone += other.ondisk_nonrelfiles_needed_as_tombstone;
|
||||
self.ondisk_nonrelfiles_removed += other.ondisk_nonrelfiles_removed;
|
||||
self.ondisk_nonrelfiles_dropped += other.ondisk_nonrelfiles_dropped;
|
||||
|
||||
self.meta_total += other.meta_total;
|
||||
self.meta_removed += other.meta_removed;
|
||||
self.meta_dropped += other.meta_dropped;
|
||||
self.pages_total += other.pages_total;
|
||||
self.pages_removed += other.pages_removed;
|
||||
self.pages_dropped += other.pages_dropped;
|
||||
self.elapsed += other.elapsed;
|
||||
}
|
||||
}
|
||||
@@ -111,14 +93,18 @@ pub trait Timeline: Send + Sync {
|
||||
|
||||
/// Get a list of all existing relations
|
||||
/// Pass RelTag to get relation objects or None to get nonrels.
|
||||
fn list_relishes(&self, tag: Option<RelTag>, lsn: Lsn) -> Result<HashSet<RelishTag>>;
|
||||
|
||||
/// Get a list of all existing relations in given tablespace and database.
|
||||
fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelishTag>>;
|
||||
|
||||
/// Get a list of all existing non-relational objects
|
||||
fn list_nonrels(&self, lsn: Lsn) -> Result<HashSet<RelishTag>>;
|
||||
|
||||
///
|
||||
/// Export data as delats and image layers between 'start_lsn' to 'end_lsn'. The
|
||||
/// start is inclusive, and end is exclusive.
|
||||
///
|
||||
fn export_timeline(&self, start_lsn: Lsn, end_lsn: Lsn) -> Result<()>;
|
||||
|
||||
/// Get the LSN where this branch was created
|
||||
fn get_ancestor_lsn(&self) -> Lsn;
|
||||
|
||||
@@ -181,6 +167,16 @@ pub trait TimelineWriter: Deref<Target = dyn Timeline> {
|
||||
/// Advance requires aligned LSN as an argument and would wake wait_lsn() callers.
|
||||
/// Previous last record LSN is stored alongside the latest and can be read.
|
||||
fn advance_last_record_lsn(&self, lsn: Lsn);
|
||||
|
||||
///
|
||||
/// Complete all delayed commits and advance disk_consistent_lsn
|
||||
///
|
||||
fn checkpoint(&self) -> Result<()>;
|
||||
|
||||
///
|
||||
/// Import data from layer files
|
||||
///
|
||||
fn import_timeline(&self, snapshot_lsn: Lsn) -> Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
@@ -213,6 +209,39 @@ impl WALRecord {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum PageVersion {
|
||||
/// an 8kb page image
|
||||
Page(Bytes),
|
||||
/// WAL record to get from previous page version to this one.
|
||||
Wal(WALRecord),
|
||||
}
|
||||
|
||||
///
|
||||
/// Data needed to reconstruct a page version
|
||||
///
|
||||
/// 'page_img' is the old base image of the page to start the WAL replay with.
|
||||
/// It can be None, if the first WAL record initializes the page (will_init)
|
||||
/// 'records' contains the records to apply over the base image.
|
||||
///
|
||||
pub struct PageReconstructData {
|
||||
pub records: Vec<(Lsn, WALRecord)>,
|
||||
pub page_img: Option<Bytes>,
|
||||
}
|
||||
|
||||
/// Return value from Layer::get_page_reconstruct_data
|
||||
pub enum PageReconstructResult {
|
||||
/// Got all the data needed to reconstruct the requested page
|
||||
Complete,
|
||||
/// This layer didn't contain all the required data, the caller should look up
|
||||
/// the predecessor layer at the returned LSN and collect more data from there.
|
||||
Continue(Lsn),
|
||||
/// This layer didn't contain data needed to reconstruct the page version at
|
||||
/// the returned LSN. This is usually considered an error, but might be OK
|
||||
/// in some circumstances.
|
||||
Missing(Lsn),
|
||||
}
|
||||
|
||||
///
|
||||
/// Tests that should work the same with any Repository/Timeline implementation.
|
||||
///
|
||||
@@ -220,7 +249,7 @@ impl WALRecord {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::layered_repository::{LayeredRepository, METADATA_FILE_NAME};
|
||||
use crate::buffered_repository::{BufferedRepository, METADATA_FILE_NAME};
|
||||
use crate::walredo::{WalRedoError, WalRedoManager};
|
||||
use crate::PageServerConf;
|
||||
use hex_literal::hex;
|
||||
@@ -296,7 +325,7 @@ mod tests {
|
||||
fn load(&self) -> Box<dyn Repository> {
|
||||
let walredo_mgr = Arc::new(TestRedoManager);
|
||||
|
||||
Box::new(LayeredRepository::new(
|
||||
Box::new(BufferedRepository::new(
|
||||
self.conf,
|
||||
walredo_mgr,
|
||||
self.tenant_id,
|
||||
|
||||
@@ -11,7 +11,7 @@ use std::io::{Read, Seek, SeekFrom};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use bytes::{Buf, Bytes};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use tracing::*;
|
||||
|
||||
use crate::relish::*;
|
||||
@@ -126,7 +126,6 @@ pub fn import_timeline_from_postgres_datadir(
|
||||
import_nonrel_file(writer, lsn, RelishTag::TwoPhase { xid }, &entry.path())?;
|
||||
}
|
||||
// TODO: Scan pg_tblspc
|
||||
|
||||
writer.advance_last_record_lsn(lsn);
|
||||
|
||||
// Import WAL. This is needed even when starting from a shutdown checkpoint, because
|
||||
@@ -140,6 +139,7 @@ pub fn import_timeline_from_postgres_datadir(
|
||||
lsn,
|
||||
&mut pg_control.checkPointCopy.clone(),
|
||||
)?;
|
||||
writer.checkpoint()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -416,7 +416,6 @@ pub fn save_decoded_record(
|
||||
if checkpoint.update_next_xid(decoded.xl_xid) {
|
||||
*checkpoint_modified = true;
|
||||
}
|
||||
|
||||
// Iterate through all the blocks that the record modifies, and
|
||||
// "put" a separate copy of the record for each block.
|
||||
for blk in decoded.blocks.iter() {
|
||||
@@ -426,14 +425,38 @@ pub fn save_decoded_record(
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum as u8,
|
||||
});
|
||||
if blk.apply_image
|
||||
&& blk.has_image
|
||||
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
|
||||
&& (decoded.xl_info == pg_constants::XLOG_FPI
|
||||
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
||||
{
|
||||
// Extract page image from FPI record
|
||||
let img_len = blk.bimg_len as usize;
|
||||
let img_offs = blk.bimg_offset as usize;
|
||||
let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize);
|
||||
image.extend_from_slice(&recdata[img_offs..img_offs + img_len]);
|
||||
|
||||
let rec = WALRecord {
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
// Compression of WAL is not yet supported
|
||||
assert!((blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0);
|
||||
|
||||
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
|
||||
if blk.hole_length != 0 {
|
||||
let tail = image.split_off(blk.hole_offset as usize);
|
||||
image.resize(image.len() + blk.hole_length as usize, 0u8);
|
||||
image.unsplit(tail);
|
||||
}
|
||||
image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
|
||||
image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
|
||||
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
|
||||
timeline.put_page_image(tag, blk.blkno, lsn, image.freeze())?;
|
||||
} else {
|
||||
let rec = WALRecord {
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
|
||||
}
|
||||
}
|
||||
|
||||
let mut buf = decoded.record.clone();
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
//! page server.
|
||||
|
||||
use crate::branches;
|
||||
use crate::layered_repository::LayeredRepository;
|
||||
use crate::buffered_repository::BufferedRepository;
|
||||
use crate::repository::{Repository, Timeline};
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::PageServerConf;
|
||||
@@ -62,6 +62,7 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
|
||||
|
||||
struct TenantHandleEntry {
|
||||
checkpointer_handle: Option<JoinHandle<()>>,
|
||||
uploader_handle: Option<JoinHandle<()>>,
|
||||
gc_handle: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
@@ -98,20 +99,22 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
|
||||
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
|
||||
|
||||
// Set up an object repository, for actual data storage.
|
||||
let repo = Arc::new(LayeredRepository::new(
|
||||
let repo = Arc::new(BufferedRepository::new(
|
||||
conf,
|
||||
Arc::new(walredo_mgr),
|
||||
tenant_id,
|
||||
true,
|
||||
));
|
||||
|
||||
let checkpointer_handle = LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
|
||||
let gc_handle = LayeredRepository::launch_gc_thread(conf, repo.clone());
|
||||
let checkpointer_handle = BufferedRepository::launch_checkpointer_thread(conf, repo.clone());
|
||||
let gc_handle = BufferedRepository::launch_gc_thread(conf, repo.clone());
|
||||
let uploader_handle = BufferedRepository::launch_upload_thread(conf, repo.clone());
|
||||
|
||||
let mut handles = TENANT_HANDLES.lock().unwrap();
|
||||
let h = TenantHandleEntry {
|
||||
checkpointer_handle: Some(checkpointer_handle),
|
||||
gc_handle: Some(gc_handle),
|
||||
uploader_handle: Some(uploader_handle),
|
||||
};
|
||||
|
||||
handles.insert(tenant_id, h);
|
||||
@@ -170,6 +173,8 @@ pub fn stop_tenant_threads(tenantid: ZTenantId) {
|
||||
if let Some(h) = handles.get_mut(&tenantid) {
|
||||
h.checkpointer_handle.take().map(JoinHandle::join);
|
||||
debug!("checkpointer for tenant {} has stopped", tenantid);
|
||||
h.uploader_handle.take().map(JoinHandle::join);
|
||||
debug!("uploader for tenant {} has stopped", tenantid);
|
||||
h.gc_handle.take().map(JoinHandle::join);
|
||||
debug!("gc for tenant {} has stopped", tenantid);
|
||||
}
|
||||
|
||||
258
pageserver/src/toast_store.rs
Normal file
258
pageserver/src/toast_store.rs
Normal file
@@ -0,0 +1,258 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use lz4_flex;
|
||||
use std::convert::TryInto;
|
||||
use std::ops::{Bound, RangeBounds};
|
||||
use std::path::Path;
|
||||
|
||||
use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Transaction, Value};
|
||||
|
||||
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
|
||||
const CACHE_SIZE: usize = 32 * 1024; // 256Mb
|
||||
//const CACHE_SIZE: usize = 128 * 1024; // 1Gb
|
||||
|
||||
///
|
||||
/// Toast storage consistof two KV databases: one for storing main index
|
||||
/// and second for storing sliced BLOB (values larger than 2kb).
|
||||
/// BLOBs and main data are stored in different databases to improve
|
||||
/// data locality and reduce key size for TOAST segments.
|
||||
///
|
||||
pub struct ToastStore {
|
||||
db: Storage, // key-value database
|
||||
}
|
||||
|
||||
pub struct ToastIterator<'a> {
|
||||
iter: StorageIterator<'a>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct PageData {
|
||||
data: [u8; 8192],
|
||||
}
|
||||
|
||||
impl PageData {
|
||||
pub fn find_first_zero_bit(&self, offs: usize) -> usize {
|
||||
let bytes = self.data;
|
||||
for i in offs..8192 {
|
||||
if bytes[i] != 0xFFu8 {
|
||||
return i * 8 + bytes[i].trailing_ones() as usize;
|
||||
}
|
||||
}
|
||||
usize::MAX
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for ToastIterator<'a> {
|
||||
type Item = Result<(Key, Value)>;
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let mut toast: Option<Vec<u8>> = None;
|
||||
let mut next_segno = 0u16;
|
||||
for elem in &mut self.iter {
|
||||
if let Ok((key, value)) = elem {
|
||||
let key_len = key.len();
|
||||
let n_segments =
|
||||
u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
|
||||
let segno = u16::from_be_bytes(key[key_len - 2..].try_into().unwrap());
|
||||
let key = key[..key_len - 4].to_vec();
|
||||
if n_segments != 0 {
|
||||
// TOAST
|
||||
assert_eq!(segno, next_segno);
|
||||
if next_segno == 0 {
|
||||
toast = Some(Vec::with_capacity(n_segments as usize * TOAST_SEGMENT_SIZE))
|
||||
}
|
||||
toast.as_mut().unwrap().extend_from_slice(&value);
|
||||
next_segno = segno + 1;
|
||||
if next_segno == n_segments {
|
||||
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
|
||||
return Some(if let Ok(decompressed_data) = res {
|
||||
Ok((key, decompressed_data))
|
||||
} else {
|
||||
Err(anyhow!(res.unwrap_err()))
|
||||
});
|
||||
}
|
||||
} else {
|
||||
return Some(Ok((key, value)));
|
||||
}
|
||||
} else {
|
||||
return Some(elem);
|
||||
}
|
||||
}
|
||||
assert_eq!(next_segno, 0);
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DoubleEndedIterator for ToastIterator<'a> {
|
||||
fn next_back(&mut self) -> Option<Self::Item> {
|
||||
let mut toast: Option<Vec<u8>> = None;
|
||||
let mut next_segno = 0u16;
|
||||
while let Some(elem) = self.iter.next_back() {
|
||||
if let Ok((key, value)) = elem {
|
||||
assert!(!value.is_empty());
|
||||
let key_len = key.len();
|
||||
let n_segments =
|
||||
u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
|
||||
let segno = u16::from_be_bytes(key[key_len - 2..].try_into().unwrap());
|
||||
let key = key[..key_len - 4].to_vec();
|
||||
if n_segments != 0 {
|
||||
// TOAST
|
||||
assert!(segno + 1 == next_segno || next_segno == 0);
|
||||
if next_segno == 0 {
|
||||
let len = (n_segments - 1) as usize * TOAST_SEGMENT_SIZE + value.len();
|
||||
let mut vec = vec![0u8; len];
|
||||
vec[len - value.len()..].copy_from_slice(&value);
|
||||
toast = Some(vec);
|
||||
} else {
|
||||
toast.as_mut().unwrap()[segno as usize * TOAST_SEGMENT_SIZE
|
||||
..(segno + 1) as usize * TOAST_SEGMENT_SIZE]
|
||||
.copy_from_slice(&value);
|
||||
}
|
||||
next_segno = segno;
|
||||
if next_segno == 0 {
|
||||
let toast = toast.unwrap();
|
||||
assert!(!toast.is_empty());
|
||||
let res = lz4_flex::decompress_size_prepended(&toast);
|
||||
return Some(if let Ok(decompressed_data) = res {
|
||||
Ok((key, decompressed_data))
|
||||
} else {
|
||||
Err(anyhow!(res.unwrap_err()))
|
||||
});
|
||||
}
|
||||
} else {
|
||||
return Some(Ok((key, value)));
|
||||
}
|
||||
} else {
|
||||
return Some(elem);
|
||||
}
|
||||
}
|
||||
assert_eq!(next_segno, 0);
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// FIXME-KK: not using WAL now. Implement asynchronous or delayed commit.
|
||||
//
|
||||
impl ToastStore {
|
||||
pub fn new(path: &Path) -> Result<ToastStore> {
|
||||
Ok(ToastStore {
|
||||
db: Storage::open(
|
||||
&path.join("pageserver.db"),
|
||||
StorageConfig {
|
||||
cache_size: CACHE_SIZE,
|
||||
nosync: false,
|
||||
mursiw: true,
|
||||
},
|
||||
)?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn put(&self, key: Key, value: Value) -> Result<()> {
|
||||
let mut tx = self.db.start_transaction();
|
||||
self.tx_remove(&mut tx, &key)?;
|
||||
let value_len = value.len();
|
||||
let mut key = key;
|
||||
if value_len >= TOAST_SEGMENT_SIZE {
|
||||
let compressed_data = lz4_flex::compress_prepend_size(&value);
|
||||
let compressed_data_len = compressed_data.len();
|
||||
let mut offs: usize = 0;
|
||||
let mut segno = 0u16;
|
||||
let n_segments =
|
||||
((compressed_data_len + TOAST_SEGMENT_SIZE - 1) / TOAST_SEGMENT_SIZE) as u16;
|
||||
assert!(n_segments != 0);
|
||||
key.extend_from_slice(&n_segments.to_be_bytes());
|
||||
key.extend_from_slice(&[0u8; 2]);
|
||||
let key_len = key.len();
|
||||
while offs + TOAST_SEGMENT_SIZE < compressed_data_len {
|
||||
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
|
||||
tx.put(
|
||||
&key,
|
||||
&compressed_data[offs..offs + TOAST_SEGMENT_SIZE].to_vec(),
|
||||
)?;
|
||||
offs += TOAST_SEGMENT_SIZE;
|
||||
segno += 1;
|
||||
}
|
||||
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
|
||||
tx.put(&key, &compressed_data[offs..].to_vec())?;
|
||||
} else {
|
||||
key.extend_from_slice(&[0u8; 4]);
|
||||
tx.put(&key, &value)?;
|
||||
}
|
||||
tx.delay()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn commit(&mut self) -> Result<()> {
|
||||
let tx = self.db.start_transaction();
|
||||
tx.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> ToastIterator<'_> {
|
||||
self.range(..)
|
||||
}
|
||||
|
||||
pub fn range<R: RangeBounds<Key>>(&self, range: R) -> ToastIterator<'_> {
|
||||
let from = match range.start_bound() {
|
||||
Bound::Included(key) => {
|
||||
let mut key = key.clone();
|
||||
key.extend_from_slice(&[0u8; 4]);
|
||||
Bound::Included(key)
|
||||
}
|
||||
Bound::Excluded(key) => {
|
||||
let mut key = key.clone();
|
||||
key.extend_from_slice(&[0u8; 4]);
|
||||
Bound::Excluded(key)
|
||||
}
|
||||
_ => Bound::Unbounded,
|
||||
};
|
||||
let till = match range.end_bound() {
|
||||
Bound::Included(key) => {
|
||||
let mut key = key.clone();
|
||||
key.extend_from_slice(&[0xFFu8; 4]);
|
||||
Bound::Included(key)
|
||||
}
|
||||
Bound::Excluded(key) => {
|
||||
let mut key = key.clone();
|
||||
key.extend_from_slice(&[0xFFu8; 4]);
|
||||
Bound::Excluded(key)
|
||||
}
|
||||
_ => Bound::Unbounded,
|
||||
};
|
||||
ToastIterator {
|
||||
iter: self.db.range((from, till)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove(&self, key: Key) -> Result<()> {
|
||||
let mut tx = self.db.start_transaction();
|
||||
self.tx_remove(&mut tx, &key)?;
|
||||
tx.delay()
|
||||
}
|
||||
|
||||
pub fn tx_remove(&self, tx: &mut Transaction, key: &[u8]) -> Result<()> {
|
||||
let mut min_key = key.to_vec();
|
||||
let mut max_key = key.to_vec();
|
||||
min_key.extend_from_slice(&[0u8; 4]);
|
||||
max_key.extend_from_slice(&[0xFFu8; 4]);
|
||||
let mut iter = tx.range(&min_key..&max_key);
|
||||
if let Some(entry) = iter.next() {
|
||||
let mut key = entry?.0;
|
||||
let key_len = key.len();
|
||||
let n_segments = u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
|
||||
if n_segments != 0 {
|
||||
// TOAST
|
||||
for i in 0..n_segments {
|
||||
key[key_len - 2..].copy_from_slice(&i.to_be_bytes());
|
||||
tx.remove(&key)?;
|
||||
}
|
||||
} else {
|
||||
tx.remove(&key)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.db.get_database_info().db_used
|
||||
}
|
||||
}
|
||||
@@ -229,17 +229,18 @@ pub struct DecodedBkpBlock {
|
||||
pub blkno: u32,
|
||||
|
||||
/* copy of the fork_flags field from the XLogRecordBlockHeader */
|
||||
flags: u8,
|
||||
pub flags: u8,
|
||||
|
||||
/* Information on full-page image, if any */
|
||||
has_image: bool, /* has image, even for consistency checking */
|
||||
pub has_image: bool, /* has image, even for consistency checking */
|
||||
pub apply_image: bool, /* has image that should be restored */
|
||||
pub will_init: bool, /* record doesn't need previous page version to apply */
|
||||
//char *bkp_image;
|
||||
hole_offset: u16,
|
||||
hole_length: u16,
|
||||
bimg_len: u16,
|
||||
bimg_info: u8,
|
||||
pub hole_offset: u16,
|
||||
pub hole_length: u16,
|
||||
pub bimg_offset: u32,
|
||||
pub bimg_len: u16,
|
||||
pub bimg_info: u8,
|
||||
|
||||
/* Buffer holding the rmgr-specific data associated with this block */
|
||||
has_data: bool,
|
||||
@@ -859,8 +860,19 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
}
|
||||
|
||||
// 3. Decode blocks.
|
||||
let mut ptr = record.len() - buf.remaining();
|
||||
for blk in blocks.iter_mut() {
|
||||
if blk.has_image {
|
||||
blk.bimg_offset = ptr as u32;
|
||||
ptr += blk.bimg_len as usize;
|
||||
}
|
||||
if blk.has_data {
|
||||
ptr += blk.data_len as usize;
|
||||
}
|
||||
}
|
||||
// We don't need them, so just skip blocks_total_len bytes
|
||||
buf.advance(blocks_total_len as usize);
|
||||
assert_eq!(ptr, record.len() - buf.remaining());
|
||||
|
||||
let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use rand::Rng;
|
||||
use serde::Serialize;
|
||||
use std::fs;
|
||||
use std::fs::OpenOptions;
|
||||
@@ -53,6 +54,8 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::XLogRecord;
|
||||
|
||||
const WAL_REDO_WORKERS: usize = 1;
|
||||
|
||||
///
|
||||
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
|
||||
///
|
||||
@@ -140,7 +143,7 @@ pub struct PostgresRedoManager {
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
runtime: tokio::runtime::Runtime,
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
workers: [Mutex<Option<PostgresRedoProcess>>; WAL_REDO_WORKERS],
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -195,7 +198,9 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
|
||||
start_time = Instant::now();
|
||||
let result = {
|
||||
let mut process_guard = self.process.lock().unwrap();
|
||||
let mut process_guard = self.workers[rand::thread_rng().gen_range(0..WAL_REDO_WORKERS)]
|
||||
.lock()
|
||||
.unwrap();
|
||||
lock_time = Instant::now();
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
@@ -237,7 +242,7 @@ impl PostgresRedoManager {
|
||||
runtime,
|
||||
tenantid,
|
||||
conf,
|
||||
process: Mutex::new(None),
|
||||
workers: [(); WAL_REDO_WORKERS].map(|_| Mutex::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 9160deb05a...6b58de66ec
@@ -87,6 +87,10 @@ impl<K: Ord, V> VecMap<K, V> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn last(&self) -> Option<&(K, V)> {
|
||||
self.0.last()
|
||||
}
|
||||
|
||||
/// Split the map into two.
|
||||
///
|
||||
/// The left map contains everything before `cutoff` (exclusive).
|
||||
|
||||
Reference in New Issue
Block a user