Compare commits

...

21 Commits

Author SHA1 Message Date
Konstantin Knizhnik
3e08ad485a Fix bug in using brin index in GC 2021-11-30 16:33:50 +03:00
Konstantin Knizhnik
5ad82418a9 Add upload thread 2021-11-24 18:04:48 +03:00
Konstantin Knizhnik
92562145c0 Change toast store API 2021-11-23 11:34:32 +03:00
Konstantin Knizhnik
915001c67e Fix clippy warnings 2021-11-22 20:05:34 +03:00
Konstantin Knizhnik
13f9565ff8 Fix indentation 2021-11-22 19:22:59 +03:00
Konstantin Knizhnik
f73d043a8b Use COW version of YAKV 2021-11-22 19:22:39 +03:00
Konstantin Knizhnik
8fda7a6183 Fix indentation 2021-11-17 12:50:46 +03:00
Konstantin Knizhnik
4acd292717 Use BRIN to optimize GC 2021-11-17 12:50:25 +03:00
Konstantin Knizhnik
b365a075f4 Save materialized pages 2021-11-17 12:16:14 +03:00
Konstantin Knizhnik
6311135d73 Save materialized pages 2021-11-17 12:16:02 +03:00
Konstantin Knizhnik
ee29446edc Add BRIN index for checkpointer 2021-11-12 17:20:17 +03:00
Konstantin Knizhnik
d2e5e0e728 Fix compression 2021-11-11 00:16:37 +03:00
Konstantin Knizhnik
3b471494ff Add import/export functions from buffered stotage to files with layeres 2021-11-10 09:48:28 +03:00
Konstantin Knizhnik
9947de4a2a Fix issues with garbage collector 2021-11-03 12:15:24 +03:00
Konstantin Knizhnik
a3e94e888a Implement garbage collector for buffered repository 2021-10-30 13:10:04 +03:00
Konstantin Knizhnik
e6f33a5cd0 Rewrite TOAST to use the same tree as main index 2021-10-29 17:00:09 +03:00
Konstantin Knizhnik
2dd35b1fbe Fix indentation 2021-10-27 19:37:50 +03:00
Konstantin Knizhnik
ce779cc754 Use delayed commit in buffered_repo 2021-10-27 19:37:23 +03:00
Konstantin Knizhnik
497258c6fe Do not produce error in get_page_at_lsn on missed page 2021-10-26 20:07:11 +03:00
Konstantin Knizhnik
0b6008012d Apply cargo fmt 2021-10-22 19:52:05 +03:00
Konstantin Knizhnik
d35fc20181 Implement buffered repository 2021-10-22 19:50:59 +03:00
23 changed files with 2727 additions and 2221 deletions

35
Cargo.lock generated
View File

@@ -918,6 +918,15 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "lz4_flex"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "177c079243f6867429aca5af5053747f57e329d44f0c58bebca078cd14873ec2"
dependencies = [
"twox-hash",
]
[[package]]
name = "matchers"
version = "0.0.1"
@@ -1180,6 +1189,7 @@ dependencies = [
"hyper",
"lazy_static",
"log",
"lz4_flex",
"postgres",
"postgres-protocol",
"postgres-types",
@@ -1198,6 +1208,7 @@ dependencies = [
"toml",
"tracing",
"workspace_hack",
"yakv",
"zenith_metrics",
"zenith_utils",
]
@@ -1908,6 +1919,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stringprep"
version = "0.1.2"
@@ -2228,6 +2245,16 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "twox-hash"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f559b464de2e2bdabcac6a210d12e9b5a5973c251e102c44c585c71d51bd78e"
dependencies = [
"cfg-if 1.0.0",
"static_assertions",
]
[[package]]
name = "typenum"
version = "1.13.0"
@@ -2542,6 +2569,14 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.2.4"
dependencies = [
"anyhow",
"fs2",
]
[[package]]
name = "zenith"
version = "0.1.0"

View File

@@ -15,3 +15,7 @@ members = [
# This is useful for profiling and, to some extent, debug.
# Besides, debug info should not affect the performance.
debug = true
panic = 'abort'
[profile.dev]
panic = 'abort'

View File

@@ -298,6 +298,7 @@ impl PostgresNode {
conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB");
conf.append("max_wal_size", "100GB");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_sender_timeout", "0");

View File

@@ -37,6 +37,9 @@ async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
#yakv = { path = "../../yakv" }
yakv = "0.2.4"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }
zenith_metrics = { path = "../zenith_metrics" }

View File

@@ -1,25 +0,0 @@
//! Main entry point for the dump_layerfile executable
//!
//! A handy tool for debugging, that's all.
use anyhow::Result;
use clap::{App, Arg};
use pageserver::layered_repository::dump_layerfile_from_path;
use std::path::PathBuf;
fn main() -> Result<()> {
let arg_matches = App::new("Zenith dump_layerfile utility")
.about("Dump contents of one layer file, for debugging")
.arg(
Arg::with_name("path")
.help("Path to file to dump")
.required(true)
.index(1),
)
.get_matches();
let path = PathBuf::from(arg_matches.value_of("path").unwrap());
dump_layerfile_from_path(&path)?;
Ok(())
}

View File

@@ -42,6 +42,9 @@ struct CfgFileParams {
listen_http_addr: Option<String>,
checkpoint_distance: Option<String>,
checkpoint_period: Option<String>,
upload_distance: Option<String>,
upload_period: Option<String>,
reconstruct_threshold: Option<String>,
gc_horizon: Option<String>,
gc_period: Option<String>,
pg_distrib_dir: Option<String>,
@@ -103,6 +106,9 @@ impl CfgFileParams {
listen_http_addr: get_arg("listen-http"),
checkpoint_distance: get_arg("checkpoint_distance"),
checkpoint_period: get_arg("checkpoint_period"),
upload_distance: get_arg("upload_distance"),
upload_period: get_arg("upload_period"),
reconstruct_threshold: get_arg("reconstruct_threshold"),
gc_horizon: get_arg("gc_horizon"),
gc_period: get_arg("gc_period"),
pg_distrib_dir: get_arg("postgres-distrib"),
@@ -121,6 +127,9 @@ impl CfgFileParams {
listen_http_addr: self.listen_http_addr.or(other.listen_http_addr),
checkpoint_distance: self.checkpoint_distance.or(other.checkpoint_distance),
checkpoint_period: self.checkpoint_period.or(other.checkpoint_period),
upload_distance: self.upload_distance.or(other.upload_distance),
upload_period: self.upload_period.or(other.upload_period),
reconstruct_threshold: self.reconstruct_threshold.or(other.reconstruct_threshold),
gc_horizon: self.gc_horizon.or(other.gc_horizon),
gc_period: self.gc_period.or(other.gc_period),
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
@@ -158,6 +167,20 @@ impl CfgFileParams {
None => DEFAULT_CHECKPOINT_PERIOD,
};
let upload_distance: u64 = match self.upload_distance.as_ref() {
Some(upload_distance_str) => upload_distance_str.parse()?,
None => DEFAULT_UPLOAD_DISTANCE,
};
let upload_period = match self.upload_period.as_ref() {
Some(upload_period_str) => humantime::parse_duration(upload_period_str)?,
None => DEFAULT_UPLOAD_PERIOD,
};
let reconstruct_threshold: u64 = match self.reconstruct_threshold.as_ref() {
Some(reconstruct_threshold_str) => reconstruct_threshold_str.parse()?,
None => DEFAULT_RECONSTRUCT_THRESHOLD,
};
let gc_horizon: u64 = match self.gc_horizon.as_ref() {
Some(horizon_str) => horizon_str.parse()?,
None => DEFAULT_GC_HORIZON,
@@ -236,6 +259,9 @@ impl CfgFileParams {
listen_http_addr,
checkpoint_distance,
checkpoint_period,
upload_distance,
upload_period,
reconstruct_threshold,
gc_horizon,
gc_period,
@@ -296,6 +322,24 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Interval between checkpoint iterations"),
)
.arg(
Arg::with_name("checkpoint_distance")
.long("checkpoint_distance")
.takes_value(true)
.help("Distance from current LSN to perform checkpoint of in-memory layers"),
)
.arg(
Arg::with_name("upload_period")
.long("upload_period")
.takes_value(true)
.help("Interval between upload iterations"),
)
.arg(
Arg::with_name("reconstruct_threshold")
.long("reconstruct_threshold")
.takes_value(true)
.help("Minimal size of deltas after which page reconstruction (materialization) can be performed"),
)
.arg(
Arg::with_name("gc_horizon")
.long("gc_horizon")
@@ -600,6 +644,9 @@ mod tests {
listen_http_addr: Some("listen_http_addr_VALUE".to_string()),
checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()),
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
upload_distance: Some("upload_distance_VALUE".to_string()),
upload_period: Some("upload_period_VALUE".to_string()),
reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()),
gc_horizon: Some("gc_horizon_VALUE".to_string()),
gc_period: Some("gc_period_VALUE".to_string()),
pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()),
@@ -623,6 +670,9 @@ mod tests {
listen_http_addr = 'listen_http_addr_VALUE'
checkpoint_distance = 'checkpoint_distance_VALUE'
checkpoint_period = 'checkpoint_period_VALUE'
upload_distance = 'upload_distance_VALUE'
upload_period = 'upload_period_VALUE'
reconstruct_threshold = 'reconstruct_threshold_VALUE'
gc_horizon = 'gc_horizon_VALUE'
gc_period = 'gc_period_VALUE'
pg_distrib_dir = 'pg_distrib_dir_VALUE'
@@ -657,6 +707,9 @@ local_path = 'relish_storage_local_VALUE'
listen_http_addr: Some("listen_http_addr_VALUE".to_string()),
checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()),
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
upload_distance: Some("upload_distance_VALUE".to_string()),
upload_period: Some("upload_period_VALUE".to_string()),
reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()),
gc_horizon: Some("gc_horizon_VALUE".to_string()),
gc_period: Some("gc_period_VALUE".to_string()),
pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()),
@@ -683,6 +736,9 @@ local_path = 'relish_storage_local_VALUE'
listen_http_addr = 'listen_http_addr_VALUE'
checkpoint_distance = 'checkpoint_distance_VALUE'
checkpoint_period = 'checkpoint_period_VALUE'
upload_distance = 'upload_distance_VALUE'
upload_period = 'upload_period_VALUE'
reconstruct_threshold = 'reconstruct_threshold_VALUE'
gc_horizon = 'gc_horizon_VALUE'
gc_period = 'gc_period_VALUE'
pg_distrib_dir = 'pg_distrib_dir_VALUE'

View File

@@ -147,7 +147,7 @@ pub fn create_repo(
let tli = create_timeline(conf, None, &tenantid)?;
let repo = Arc::new(crate::layered_repository::LayeredRepository::new(
let repo = Arc::new(crate::buffered_repository::BufferedRepository::new(
conf,
wal_redo_manager,
tenantid,

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -39,9 +39,8 @@
//!
use crate::layered_repository::blob::BlobWriter;
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
};
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
use crate::repository::{PageReconstructData, PageReconstructResult, PageVersion};
use crate::waldecoder;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
@@ -148,6 +147,10 @@ pub struct DeltaLayerInner {
}
impl Layer for DeltaLayer {
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}
@@ -201,22 +204,22 @@ impl Layer for DeltaLayer {
for ((_blknum, pv_lsn), blob_range) in iter {
let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?;
if let Some(img) = pv.page_image {
// Found a page image, return it
reconstruct_data.page_img = Some(img);
need_image = false;
break;
} else if let Some(rec) = pv.record {
let will_init = rec.will_init;
reconstruct_data.records.push((*pv_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
match pv {
PageVersion::Page(img) => {
// Found a page image, return it
reconstruct_data.page_img = Some(img);
need_image = false;
break;
}
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
PageVersion::Wal(rec) => {
let will_init = rec.will_init;
reconstruct_data.records.push((*pv_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
@@ -226,7 +229,7 @@ impl Layer for DeltaLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(PageReconstructResult::Continue(self.start_lsn))
Ok(PageReconstructResult::Continue(Lsn(self.start_lsn.0 - 1)))
} else {
Ok(PageReconstructResult::Complete)
}
@@ -307,19 +310,22 @@ impl Layer for DeltaLayer {
let buf = read_blob(&chapter, blob_range)?;
let pv = PageVersion::des(&buf)?;
if let Some(img) = pv.page_image.as_ref() {
write!(&mut desc, " img {} bytes", img.len())?;
}
if let Some(rec) = pv.record.as_ref() {
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
write!(
&mut desc,
" rec {} bytes will_init: {} {}",
rec.rec.len(),
rec.will_init,
wal_desc
)?;
match pv {
PageVersion::Page(img) => {
write!(&mut desc, " img {} bytes", img.len())?;
}
PageVersion::Wal(rec) => {
let wal_desc = waldecoder::describe_wal_record(&rec.rec);
write!(
&mut desc,
" rec {} bytes will_init: {} {}",
rec.rec.len(),
rec.will_init,
wal_desc
)?;
}
}
println!(" blk {} at {}: {}", blk, lsn, desc);
}
@@ -328,6 +334,19 @@ impl Layer for DeltaLayer {
}
impl DeltaLayer {
/// debugging function to print out the contents of the layer
pub fn versions(&self) -> Result<Vec<(u32, Lsn, PageVersion)>> {
let mut versions: Vec<(u32, Lsn, PageVersion)> = Vec::new();
let inner = self.load()?;
let (_path, book) = self.open_book()?;
let chapter = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
for ((blk, lsn), blob_range) in inner.page_version_metas.as_slice() {
let buf = read_blob(&chapter, blob_range)?;
versions.push((*blk, *lsn, PageVersion::des(&buf)?));
}
Ok(versions)
}
fn path_for(
path_or_conf: &PathOrConf,
timelineid: ZTimelineId,
@@ -360,6 +379,7 @@ impl DeltaLayer {
dropped: bool,
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
relsizes: VecMap<Lsn, u32>,
nosync: bool,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
assert!(!relsizes.is_empty());
@@ -432,8 +452,9 @@ impl DeltaLayer {
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
if !nosync {
writer.get_ref().sync_all()?;
}
trace!("saved {}", &path.display());
drop(inner);
@@ -442,12 +463,7 @@ impl DeltaLayer {
}
fn open_book(&self) -> Result<(PathBuf, Book<File>)> {
let path = Self::path_for(
&self.path_or_conf,
self.timelineid,
self.tenantid,
&self.layer_name(),
);
let path = self.path();
let file = File::open(&path)?;
let book = Book::new(file)?;

View File

@@ -13,8 +13,6 @@ use anyhow::Result;
use log::*;
use zenith_utils::lsn::Lsn;
use super::METADATA_FILE_NAME;
// Note: LayeredTimeline::load_layer_map() relies on this sort order
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct DeltaFileName {
@@ -292,7 +290,7 @@ pub fn list_files(
deltafiles.push(deltafilename);
} else if let Some(imgfilename) = ImageFileName::parse_str(fname) {
imgfiles.push(imgfilename);
} else if fname == METADATA_FILE_NAME || fname == "ancestor" || fname.ends_with(".old") {
} else if fname == "metadata" || fname == "ancestor" || fname.ends_with(".old") {
// ignore these
} else {
warn!("unrecognized filename in timeline dir: {}", fname);

View File

@@ -22,11 +22,8 @@
//! For non-blocky relishes, the image can be found in NONBLOCKY_IMAGE_CHAPTER.
//!
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag,
};
use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::RELISH_SEG_SIZE;
use crate::layered_repository::storage_layer::{Layer, SegmentTag, RELISH_SEG_SIZE};
use crate::repository::{PageReconstructData, PageReconstructResult};
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{anyhow, bail, ensure, Result};
@@ -117,6 +114,10 @@ impl Layer for ImageLayer {
PathBuf::from(self.layer_name().to_string())
}
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}
@@ -250,13 +251,14 @@ impl ImageLayer {
}
/// Create a new image file, using the given array of pages.
fn create(
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
seg: SegmentTag,
lsn: Lsn,
base_images: Vec<Bytes>,
nosync: bool,
) -> Result<ImageLayer> {
let image_type = if seg.rel.is_blocky() {
let num_blocks: u32 = base_images.len().try_into()?;
@@ -316,8 +318,9 @@ impl ImageLayer {
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
if !nosync {
writer.get_ref().sync_all()?;
}
trace!("saved {}", path.display());
drop(inner);
@@ -325,6 +328,7 @@ impl ImageLayer {
Ok(layer)
}
/*
// Create a new image file by materializing every page in a source layer
// at given LSN.
pub fn create_from_src(
@@ -362,6 +366,7 @@ impl ImageLayer {
Self::create(conf, timelineid, timeline.tenantid, seg, lsn, base_images)
}
*/
///
/// Load the contents of the file into memory

View File

@@ -3,10 +3,9 @@
//!
use crate::relish::RelishTag;
use crate::repository::WALRecord;
use crate::ZTimelineId;
use crate::repository::{PageReconstructData, PageReconstructResult};
use crate::{ZTenantId, ZTimelineId};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::PathBuf;
@@ -45,56 +44,6 @@ impl SegmentTag {
}
}
///
/// Represents a version of a page at a specific LSN. The LSN is the key of the
/// entry in the 'page_versions' hash, it is not duplicated here.
///
/// A page version can be stored as a full page image, or as WAL record that needs
/// to be applied over the previous page version to reconstruct this version.
///
/// It's also possible to have both a WAL record and a page image in the same
/// PageVersion. That happens if page version is originally stored as a WAL record
/// but it is later reconstructed by a GetPage@LSN request by performing WAL
/// redo. The get_page_at_lsn() code will store the reconstructed pag image next to
/// the WAL record in that case. TODO: That's pretty accidental, not the result
/// of any grand design. If we want to keep reconstructed page versions around, we
/// probably should have a separate buffer cache so that we could control the
/// replacement policy globally. Or if we keep a reconstructed page image, we
/// could throw away the WAL record.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PageVersion {
/// an 8kb page image
pub page_image: Option<Bytes>,
/// WAL record to get from previous page version to this one.
pub record: Option<WALRecord>,
}
///
/// Data needed to reconstruct a page version
///
/// 'page_img' is the old base image of the page to start the WAL replay with.
/// It can be None, if the first WAL record initializes the page (will_init)
/// 'records' contains the records to apply over the base image.
///
pub struct PageReconstructData {
pub records: Vec<(Lsn, WALRecord)>,
pub page_img: Option<Bytes>,
}
/// Return value from Layer::get_page_reconstruct_data
pub enum PageReconstructResult {
/// Got all the data needed to reconstruct the requested page
Complete,
/// This layer didn't contain all the required data, the caller should look up
/// the predecessor layer at the returned LSN and collect more data from there.
Continue(Lsn),
/// This layer didn't contain data needed to reconstruct the page version at
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
Missing(Lsn),
}
///
/// A Layer corresponds to one RELISH_SEG_SIZE slice of a relish in a range of LSNs.
/// There are two kinds of layers, in-memory and on-disk layers. In-memory
@@ -104,6 +53,8 @@ pub enum PageReconstructResult {
/// in-memory and on-disk layers.
///
pub trait Layer: Send + Sync {
fn get_tenant_id(&self) -> ZTenantId;
/// Identify the timeline this relish belongs to
fn get_timeline_id(&self) -> ZTimelineId;

View File

@@ -9,6 +9,7 @@ use zenith_metrics::{register_int_gauge_vec, IntGaugeVec};
pub mod basebackup;
pub mod branches;
pub mod buffered_repository;
pub mod http;
pub mod layered_repository;
pub mod page_service;
@@ -17,6 +18,7 @@ pub mod relish_storage;
pub mod repository;
pub mod restore_local_repo;
pub mod tenant_mgr;
pub mod toast_store;
pub mod waldecoder;
pub mod walreceiver;
pub mod walredo;
@@ -30,14 +32,17 @@ pub mod defaults {
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
// Minimal size of WAL records chain to trigger materialization of the page
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_UPLOAD_DISTANCE: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(2500);
pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0;
pub const DEFAULT_GC_HORIZON: u64 = 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
@@ -64,6 +69,9 @@ pub struct PageServerConf {
// page server crashes.
pub checkpoint_distance: u64,
pub checkpoint_period: Duration,
pub upload_period: Duration,
pub upload_distance: u64,
pub reconstruct_threshold: u64,
pub gc_horizon: u64,
pub gc_period: Duration,
@@ -149,6 +157,9 @@ impl PageServerConf {
daemonize: false,
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_period: Duration::from_secs(10),
upload_distance: defaults::DEFAULT_UPLOAD_DISTANCE,
upload_period: defaults::DEFAULT_UPLOAD_PERIOD,
reconstruct_threshold: defaults::DEFAULT_RECONSTRUCT_THRESHOLD,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),

View File

@@ -693,67 +693,21 @@ impl postgres_backend::Handler for PageServerHandler {
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layer_relfiles_total"),
RowDescriptor::int8_col(b"layer_relfiles_needed_by_cutoff"),
RowDescriptor::int8_col(b"layer_relfiles_needed_by_branches"),
RowDescriptor::int8_col(b"layer_relfiles_not_updated"),
RowDescriptor::int8_col(b"layer_relfiles_needed_as_tombstone"),
RowDescriptor::int8_col(b"layer_relfiles_removed"),
RowDescriptor::int8_col(b"layer_relfiles_dropped"),
RowDescriptor::int8_col(b"layer_nonrelfiles_total"),
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_by_cutoff"),
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_by_branches"),
RowDescriptor::int8_col(b"layer_nonrelfiles_not_updated"),
RowDescriptor::int8_col(b"layer_nonrelfiles_needed_as_tombstone"),
RowDescriptor::int8_col(b"layer_nonrelfiles_removed"),
RowDescriptor::int8_col(b"layer_nonrelfiles_dropped"),
RowDescriptor::int8_col(b"meta_total"),
RowDescriptor::int8_col(b"meta_removed"),
RowDescriptor::int8_col(b"meta_dropped"),
RowDescriptor::int8_col(b"pages_total"),
RowDescriptor::int8_col(b"pages_removed"),
RowDescriptor::int8_col(b"pages_dropped"),
RowDescriptor::int8_col(b"elapsed"),
]))?
.write_message_noflush(&BeMessage::DataRow(&[
Some(result.ondisk_relfiles_total.to_string().as_bytes()),
Some(
result
.ondisk_relfiles_needed_by_cutoff
.to_string()
.as_bytes(),
),
Some(
result
.ondisk_relfiles_needed_by_branches
.to_string()
.as_bytes(),
),
Some(result.ondisk_relfiles_not_updated.to_string().as_bytes()),
Some(
result
.ondisk_relfiles_needed_as_tombstone
.to_string()
.as_bytes(),
),
Some(result.ondisk_relfiles_removed.to_string().as_bytes()),
Some(result.ondisk_relfiles_dropped.to_string().as_bytes()),
Some(result.ondisk_nonrelfiles_total.to_string().as_bytes()),
Some(
result
.ondisk_nonrelfiles_needed_by_cutoff
.to_string()
.as_bytes(),
),
Some(
result
.ondisk_nonrelfiles_needed_by_branches
.to_string()
.as_bytes(),
),
Some(result.ondisk_nonrelfiles_not_updated.to_string().as_bytes()),
Some(
result
.ondisk_nonrelfiles_needed_as_tombstone
.to_string()
.as_bytes(),
),
Some(result.ondisk_nonrelfiles_removed.to_string().as_bytes()),
Some(result.ondisk_nonrelfiles_dropped.to_string().as_bytes()),
Some(result.meta_total.to_string().as_bytes()),
Some(result.meta_removed.to_string().as_bytes()),
Some(result.meta_dropped.to_string().as_bytes()),
Some(result.pages_total.to_string().as_bytes()),
Some(result.pages_removed.to_string().as_bytes()),
Some(result.pages_dropped.to_string().as_bytes()),
Some(result.elapsed.as_millis().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;

View File

@@ -44,45 +44,27 @@ pub trait Repository: Send + Sync {
///
/// Result of performing GC
///
#[derive(Default)]
#[derive(Default, Debug)]
pub struct GcResult {
pub ondisk_relfiles_total: u64,
pub ondisk_relfiles_needed_by_cutoff: u64,
pub ondisk_relfiles_needed_by_branches: u64,
pub ondisk_relfiles_not_updated: u64,
pub ondisk_relfiles_needed_as_tombstone: u64,
pub ondisk_relfiles_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
pub ondisk_relfiles_dropped: u64, // # of layer files removed because the relation was dropped
pub meta_removed: u64, // removed versions beyond PITR interval for which new page image exists
pub meta_dropped: u64, // removed versions beyond PITR interval of dropped relations
pub meta_total: u64, // total number of metaobject version histories
pub ondisk_nonrelfiles_total: u64,
pub ondisk_nonrelfiles_needed_by_cutoff: u64,
pub ondisk_nonrelfiles_needed_by_branches: u64,
pub ondisk_nonrelfiles_not_updated: u64,
pub ondisk_nonrelfiles_needed_as_tombstone: u64,
pub ondisk_nonrelfiles_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
pub ondisk_nonrelfiles_dropped: u64, // # of layer files removed because the relation was dropped
pub pages_removed: u64, // removed versions beyond PITR interval for which new page image exists
pub pages_dropped: u64, // removed versions beyond PITR interval of dropped relations
pub pages_total: u64, // total number of page vaersion histories
pub elapsed: Duration,
}
impl AddAssign for GcResult {
fn add_assign(&mut self, other: Self) {
self.ondisk_relfiles_total += other.ondisk_relfiles_total;
self.ondisk_relfiles_needed_by_cutoff += other.ondisk_relfiles_needed_by_cutoff;
self.ondisk_relfiles_needed_by_branches += other.ondisk_relfiles_needed_by_branches;
self.ondisk_relfiles_not_updated += other.ondisk_relfiles_not_updated;
self.ondisk_relfiles_needed_as_tombstone += other.ondisk_relfiles_needed_as_tombstone;
self.ondisk_relfiles_removed += other.ondisk_relfiles_removed;
self.ondisk_relfiles_dropped += other.ondisk_relfiles_dropped;
self.ondisk_nonrelfiles_total += other.ondisk_nonrelfiles_total;
self.ondisk_nonrelfiles_needed_by_cutoff += other.ondisk_nonrelfiles_needed_by_cutoff;
self.ondisk_nonrelfiles_needed_by_branches += other.ondisk_nonrelfiles_needed_by_branches;
self.ondisk_nonrelfiles_not_updated += other.ondisk_nonrelfiles_not_updated;
self.ondisk_nonrelfiles_needed_as_tombstone += other.ondisk_nonrelfiles_needed_as_tombstone;
self.ondisk_nonrelfiles_removed += other.ondisk_nonrelfiles_removed;
self.ondisk_nonrelfiles_dropped += other.ondisk_nonrelfiles_dropped;
self.meta_total += other.meta_total;
self.meta_removed += other.meta_removed;
self.meta_dropped += other.meta_dropped;
self.pages_total += other.pages_total;
self.pages_removed += other.pages_removed;
self.pages_dropped += other.pages_dropped;
self.elapsed += other.elapsed;
}
}
@@ -111,14 +93,18 @@ pub trait Timeline: Send + Sync {
/// Get a list of all existing relations
/// Pass RelTag to get relation objects or None to get nonrels.
fn list_relishes(&self, tag: Option<RelTag>, lsn: Lsn) -> Result<HashSet<RelishTag>>;
/// Get a list of all existing relations in given tablespace and database.
fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelishTag>>;
/// Get a list of all existing non-relational objects
fn list_nonrels(&self, lsn: Lsn) -> Result<HashSet<RelishTag>>;
///
/// Export data as delats and image layers between 'start_lsn' to 'end_lsn'. The
/// start is inclusive, and end is exclusive.
///
fn export_timeline(&self, start_lsn: Lsn, end_lsn: Lsn) -> Result<()>;
/// Get the LSN where this branch was created
fn get_ancestor_lsn(&self) -> Lsn;
@@ -181,6 +167,16 @@ pub trait TimelineWriter: Deref<Target = dyn Timeline> {
/// Advance requires aligned LSN as an argument and would wake wait_lsn() callers.
/// Previous last record LSN is stored alongside the latest and can be read.
fn advance_last_record_lsn(&self, lsn: Lsn);
///
/// Complete all delayed commits and advance disk_consistent_lsn
///
fn checkpoint(&self) -> Result<()>;
///
/// Import data from layer files
///
fn import_timeline(&self, snapshot_lsn: Lsn) -> Result<()>;
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -213,6 +209,39 @@ impl WALRecord {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PageVersion {
/// an 8kb page image
Page(Bytes),
/// WAL record to get from previous page version to this one.
Wal(WALRecord),
}
///
/// Data needed to reconstruct a page version
///
/// 'page_img' is the old base image of the page to start the WAL replay with.
/// It can be None, if the first WAL record initializes the page (will_init)
/// 'records' contains the records to apply over the base image.
///
pub struct PageReconstructData {
pub records: Vec<(Lsn, WALRecord)>,
pub page_img: Option<Bytes>,
}
/// Return value from Layer::get_page_reconstruct_data
pub enum PageReconstructResult {
/// Got all the data needed to reconstruct the requested page
Complete,
/// This layer didn't contain all the required data, the caller should look up
/// the predecessor layer at the returned LSN and collect more data from there.
Continue(Lsn),
/// This layer didn't contain data needed to reconstruct the page version at
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
Missing(Lsn),
}
///
/// Tests that should work the same with any Repository/Timeline implementation.
///
@@ -220,7 +249,7 @@ impl WALRecord {
#[cfg(test)]
mod tests {
use super::*;
use crate::layered_repository::{LayeredRepository, METADATA_FILE_NAME};
use crate::buffered_repository::{BufferedRepository, METADATA_FILE_NAME};
use crate::walredo::{WalRedoError, WalRedoManager};
use crate::PageServerConf;
use hex_literal::hex;
@@ -296,7 +325,7 @@ mod tests {
fn load(&self) -> Box<dyn Repository> {
let walredo_mgr = Arc::new(TestRedoManager);
Box::new(LayeredRepository::new(
Box::new(BufferedRepository::new(
self.conf,
walredo_mgr,
self.tenant_id,

View File

@@ -11,7 +11,7 @@ use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail, Result};
use bytes::{Buf, Bytes};
use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::relish::*;
@@ -126,7 +126,6 @@ pub fn import_timeline_from_postgres_datadir(
import_nonrel_file(writer, lsn, RelishTag::TwoPhase { xid }, &entry.path())?;
}
// TODO: Scan pg_tblspc
writer.advance_last_record_lsn(lsn);
// Import WAL. This is needed even when starting from a shutdown checkpoint, because
@@ -140,6 +139,7 @@ pub fn import_timeline_from_postgres_datadir(
lsn,
&mut pg_control.checkPointCopy.clone(),
)?;
writer.checkpoint()?;
Ok(())
}
@@ -416,7 +416,6 @@ pub fn save_decoded_record(
if checkpoint.update_next_xid(decoded.xl_xid) {
*checkpoint_modified = true;
}
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
@@ -426,14 +425,38 @@ pub fn save_decoded_record(
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
});
if blk.apply_image
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
{
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize);
image.extend_from_slice(&recdata[img_offs..img_offs + img_len]);
let rec = WALRecord {
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
// Compression of WAL is not yet supported
assert!((blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0);
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
if blk.hole_length != 0 {
let tail = image.split_off(blk.hole_offset as usize);
image.resize(image.len() + blk.hole_length as usize, 0u8);
image.unsplit(tail);
}
image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
timeline.put_page_image(tag, blk.blkno, lsn, image.freeze())?;
} else {
let rec = WALRecord {
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(lsn, tag, blk.blkno, rec)?;
}
}
let mut buf = decoded.record.clone();

View File

@@ -2,7 +2,7 @@
//! page server.
use crate::branches;
use crate::layered_repository::LayeredRepository;
use crate::buffered_repository::BufferedRepository;
use crate::repository::{Repository, Timeline};
use crate::walredo::PostgresRedoManager;
use crate::PageServerConf;
@@ -62,6 +62,7 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
struct TenantHandleEntry {
checkpointer_handle: Option<JoinHandle<()>>,
uploader_handle: Option<JoinHandle<()>>,
gc_handle: Option<JoinHandle<()>>,
}
@@ -98,20 +99,22 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Set up an object repository, for actual data storage.
let repo = Arc::new(LayeredRepository::new(
let repo = Arc::new(BufferedRepository::new(
conf,
Arc::new(walredo_mgr),
tenant_id,
true,
));
let checkpointer_handle = LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
let gc_handle = LayeredRepository::launch_gc_thread(conf, repo.clone());
let checkpointer_handle = BufferedRepository::launch_checkpointer_thread(conf, repo.clone());
let gc_handle = BufferedRepository::launch_gc_thread(conf, repo.clone());
let uploader_handle = BufferedRepository::launch_upload_thread(conf, repo.clone());
let mut handles = TENANT_HANDLES.lock().unwrap();
let h = TenantHandleEntry {
checkpointer_handle: Some(checkpointer_handle),
gc_handle: Some(gc_handle),
uploader_handle: Some(uploader_handle),
};
handles.insert(tenant_id, h);
@@ -170,6 +173,8 @@ pub fn stop_tenant_threads(tenantid: ZTenantId) {
if let Some(h) = handles.get_mut(&tenantid) {
h.checkpointer_handle.take().map(JoinHandle::join);
debug!("checkpointer for tenant {} has stopped", tenantid);
h.uploader_handle.take().map(JoinHandle::join);
debug!("uploader for tenant {} has stopped", tenantid);
h.gc_handle.take().map(JoinHandle::join);
debug!("gc for tenant {} has stopped", tenantid);
}

View File

@@ -0,0 +1,258 @@
use anyhow::{anyhow, Result};
use lz4_flex;
use std::convert::TryInto;
use std::ops::{Bound, RangeBounds};
use std::path::Path;
use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Transaction, Value};
const TOAST_SEGMENT_SIZE: usize = 2 * 1024;
const CACHE_SIZE: usize = 32 * 1024; // 256Mb
//const CACHE_SIZE: usize = 128 * 1024; // 1Gb
///
/// Toast storage consistof two KV databases: one for storing main index
/// and second for storing sliced BLOB (values larger than 2kb).
/// BLOBs and main data are stored in different databases to improve
/// data locality and reduce key size for TOAST segments.
///
pub struct ToastStore {
db: Storage, // key-value database
}
pub struct ToastIterator<'a> {
iter: StorageIterator<'a>,
}
#[derive(Clone, Copy)]
pub struct PageData {
data: [u8; 8192],
}
impl PageData {
pub fn find_first_zero_bit(&self, offs: usize) -> usize {
let bytes = self.data;
for i in offs..8192 {
if bytes[i] != 0xFFu8 {
return i * 8 + bytes[i].trailing_ones() as usize;
}
}
usize::MAX
}
}
impl<'a> Iterator for ToastIterator<'a> {
type Item = Result<(Key, Value)>;
fn next(&mut self) -> Option<Self::Item> {
let mut toast: Option<Vec<u8>> = None;
let mut next_segno = 0u16;
for elem in &mut self.iter {
if let Ok((key, value)) = elem {
let key_len = key.len();
let n_segments =
u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
let segno = u16::from_be_bytes(key[key_len - 2..].try_into().unwrap());
let key = key[..key_len - 4].to_vec();
if n_segments != 0 {
// TOAST
assert_eq!(segno, next_segno);
if next_segno == 0 {
toast = Some(Vec::with_capacity(n_segments as usize * TOAST_SEGMENT_SIZE))
}
toast.as_mut().unwrap().extend_from_slice(&value);
next_segno = segno + 1;
if next_segno == n_segments {
let res = lz4_flex::decompress_size_prepended(&toast.unwrap());
return Some(if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
});
}
} else {
return Some(Ok((key, value)));
}
} else {
return Some(elem);
}
}
assert_eq!(next_segno, 0);
None
}
}
impl<'a> DoubleEndedIterator for ToastIterator<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
let mut toast: Option<Vec<u8>> = None;
let mut next_segno = 0u16;
while let Some(elem) = self.iter.next_back() {
if let Ok((key, value)) = elem {
assert!(!value.is_empty());
let key_len = key.len();
let n_segments =
u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
let segno = u16::from_be_bytes(key[key_len - 2..].try_into().unwrap());
let key = key[..key_len - 4].to_vec();
if n_segments != 0 {
// TOAST
assert!(segno + 1 == next_segno || next_segno == 0);
if next_segno == 0 {
let len = (n_segments - 1) as usize * TOAST_SEGMENT_SIZE + value.len();
let mut vec = vec![0u8; len];
vec[len - value.len()..].copy_from_slice(&value);
toast = Some(vec);
} else {
toast.as_mut().unwrap()[segno as usize * TOAST_SEGMENT_SIZE
..(segno + 1) as usize * TOAST_SEGMENT_SIZE]
.copy_from_slice(&value);
}
next_segno = segno;
if next_segno == 0 {
let toast = toast.unwrap();
assert!(!toast.is_empty());
let res = lz4_flex::decompress_size_prepended(&toast);
return Some(if let Ok(decompressed_data) = res {
Ok((key, decompressed_data))
} else {
Err(anyhow!(res.unwrap_err()))
});
}
} else {
return Some(Ok((key, value)));
}
} else {
return Some(elem);
}
}
assert_eq!(next_segno, 0);
None
}
}
//
// FIXME-KK: not using WAL now. Implement asynchronous or delayed commit.
//
impl ToastStore {
pub fn new(path: &Path) -> Result<ToastStore> {
Ok(ToastStore {
db: Storage::open(
&path.join("pageserver.db"),
StorageConfig {
cache_size: CACHE_SIZE,
nosync: false,
mursiw: true,
},
)?,
})
}
pub fn put(&self, key: Key, value: Value) -> Result<()> {
let mut tx = self.db.start_transaction();
self.tx_remove(&mut tx, &key)?;
let value_len = value.len();
let mut key = key;
if value_len >= TOAST_SEGMENT_SIZE {
let compressed_data = lz4_flex::compress_prepend_size(&value);
let compressed_data_len = compressed_data.len();
let mut offs: usize = 0;
let mut segno = 0u16;
let n_segments =
((compressed_data_len + TOAST_SEGMENT_SIZE - 1) / TOAST_SEGMENT_SIZE) as u16;
assert!(n_segments != 0);
key.extend_from_slice(&n_segments.to_be_bytes());
key.extend_from_slice(&[0u8; 2]);
let key_len = key.len();
while offs + TOAST_SEGMENT_SIZE < compressed_data_len {
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(
&key,
&compressed_data[offs..offs + TOAST_SEGMENT_SIZE].to_vec(),
)?;
offs += TOAST_SEGMENT_SIZE;
segno += 1;
}
key[key_len - 2..].copy_from_slice(&segno.to_be_bytes());
tx.put(&key, &compressed_data[offs..].to_vec())?;
} else {
key.extend_from_slice(&[0u8; 4]);
tx.put(&key, &value)?;
}
tx.delay()?;
Ok(())
}
pub fn commit(&mut self) -> Result<()> {
let tx = self.db.start_transaction();
tx.commit()?;
Ok(())
}
pub fn iter(&self) -> ToastIterator<'_> {
self.range(..)
}
pub fn range<R: RangeBounds<Key>>(&self, range: R) -> ToastIterator<'_> {
let from = match range.start_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8; 4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0u8; 4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded,
};
let till = match range.end_bound() {
Bound::Included(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8; 4]);
Bound::Included(key)
}
Bound::Excluded(key) => {
let mut key = key.clone();
key.extend_from_slice(&[0xFFu8; 4]);
Bound::Excluded(key)
}
_ => Bound::Unbounded,
};
ToastIterator {
iter: self.db.range((from, till)),
}
}
pub fn remove(&self, key: Key) -> Result<()> {
let mut tx = self.db.start_transaction();
self.tx_remove(&mut tx, &key)?;
tx.delay()
}
pub fn tx_remove(&self, tx: &mut Transaction, key: &[u8]) -> Result<()> {
let mut min_key = key.to_vec();
let mut max_key = key.to_vec();
min_key.extend_from_slice(&[0u8; 4]);
max_key.extend_from_slice(&[0xFFu8; 4]);
let mut iter = tx.range(&min_key..&max_key);
if let Some(entry) = iter.next() {
let mut key = entry?.0;
let key_len = key.len();
let n_segments = u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap());
if n_segments != 0 {
// TOAST
for i in 0..n_segments {
key[key_len - 2..].copy_from_slice(&i.to_be_bytes());
tx.remove(&key)?;
}
} else {
tx.remove(&key)?;
}
}
Ok(())
}
pub fn size(&self) -> u64 {
self.db.get_database_info().db_used
}
}

View File

@@ -229,17 +229,18 @@ pub struct DecodedBkpBlock {
pub blkno: u32,
/* copy of the fork_flags field from the XLogRecordBlockHeader */
flags: u8,
pub flags: u8,
/* Information on full-page image, if any */
has_image: bool, /* has image, even for consistency checking */
pub has_image: bool, /* has image, even for consistency checking */
pub apply_image: bool, /* has image that should be restored */
pub will_init: bool, /* record doesn't need previous page version to apply */
//char *bkp_image;
hole_offset: u16,
hole_length: u16,
bimg_len: u16,
bimg_info: u8,
pub hole_offset: u16,
pub hole_length: u16,
pub bimg_offset: u32,
pub bimg_len: u16,
pub bimg_info: u8,
/* Buffer holding the rmgr-specific data associated with this block */
has_data: bool,
@@ -859,8 +860,19 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
// 3. Decode blocks.
let mut ptr = record.len() - buf.remaining();
for blk in blocks.iter_mut() {
if blk.has_image {
blk.bimg_offset = ptr as u32;
ptr += blk.bimg_len as usize;
}
if blk.has_data {
ptr += blk.data_len as usize;
}
}
// We don't need them, so just skip blocks_total_len bytes
buf.advance(blocks_total_len as usize);
assert_eq!(ptr, record.len() - buf.remaining());
let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;

View File

@@ -22,6 +22,7 @@ use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use log::*;
use rand::Rng;
use serde::Serialize;
use std::fs;
use std::fs::OpenOptions;
@@ -53,6 +54,8 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::XLogRecord;
const WAL_REDO_WORKERS: usize = 1;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
///
@@ -140,7 +143,7 @@ pub struct PostgresRedoManager {
conf: &'static PageServerConf,
runtime: tokio::runtime::Runtime,
process: Mutex<Option<PostgresRedoProcess>>,
workers: [Mutex<Option<PostgresRedoProcess>>; WAL_REDO_WORKERS],
}
#[derive(Debug)]
@@ -195,7 +198,9 @@ impl WalRedoManager for PostgresRedoManager {
start_time = Instant::now();
let result = {
let mut process_guard = self.process.lock().unwrap();
let mut process_guard = self.workers[rand::thread_rng().gen_range(0..WAL_REDO_WORKERS)]
.lock()
.unwrap();
lock_time = Instant::now();
// launch the WAL redo process on first use
@@ -237,7 +242,7 @@ impl PostgresRedoManager {
runtime,
tenantid,
conf,
process: Mutex::new(None),
workers: [(); WAL_REDO_WORKERS].map(|_| Mutex::new(None)),
}
}

View File

@@ -87,6 +87,10 @@ impl<K: Ord, V> VecMap<K, V> {
Ok(None)
}
pub fn last(&self) -> Option<&(K, V)> {
self.0.last()
}
/// Split the map into two.
///
/// The left map contains everything before `cutoff` (exclusive).