mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 21:40:39 +00:00
Compare commits
15 Commits
rfc-pagese
...
fixture-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0705c99fdb | ||
|
|
21089d5217 | ||
|
|
bd33ea9fae | ||
|
|
414279726d | ||
|
|
6d99b4f1d8 | ||
|
|
a7bf60631f | ||
|
|
07bb7a2afe | ||
|
|
142e247e85 | ||
|
|
7da47d8a0a | ||
|
|
dc52436a8f | ||
|
|
995a2de21e | ||
|
|
e593cbaaba | ||
|
|
4b9e02be45 | ||
|
|
7a36d06cc2 | ||
|
|
4227cfc96e |
@@ -83,6 +83,7 @@ runs:
|
||||
# this variable will be embedded in perf test report
|
||||
# and is needed to distinguish different environments
|
||||
PLATFORM: github-actions-selfhosted
|
||||
BUILD_TYPE: ${{ inputs.build_type }}
|
||||
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
@@ -401,6 +401,7 @@ impl PageServerNode {
|
||||
.get("checkpoint_distance")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()?,
|
||||
checkpoint_timeout: settings.get("checkpoint_timeout").map(|x| x.to_string()),
|
||||
compaction_target_size: settings
|
||||
.get("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
@@ -455,6 +456,7 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_distance' as an integer")?,
|
||||
checkpoint_timeout: settings.get("checkpoint_timeout").map(|x| x.to_string()),
|
||||
compaction_target_size: settings
|
||||
.get("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
|
||||
@@ -15,7 +15,7 @@ listen_pg_addr = '127.0.0.1:64000'
|
||||
listen_http_addr = '127.0.0.1:9898'
|
||||
|
||||
checkpoint_distance = '268435456' # in bytes
|
||||
checkpoint_period = '1 s'
|
||||
checkpoint_timeout = '10m'
|
||||
|
||||
gc_period = '100 s'
|
||||
gc_horizon = '67108864'
|
||||
@@ -46,7 +46,7 @@ Note the `[remote_storage]` section: it's a [table](https://toml.io/en/v1.0.0#ta
|
||||
|
||||
All values can be passed as an argument to the pageserver binary, using the `-c` parameter and specified as a valid TOML string. All tables should be passed in the inline form.
|
||||
|
||||
Example: `${PAGESERVER_BIN} -c "checkpoint_period = '100 s'" -c "remote_storage={local_path='/some/local/path/'}"`
|
||||
Example: `${PAGESERVER_BIN} -c "checkpoint_timeout = '10 m'" -c "remote_storage={local_path='/some/local/path/'}"`
|
||||
|
||||
Note that TOML distinguishes between strings and integers, the former require single or double quotes around them.
|
||||
|
||||
@@ -82,6 +82,14 @@ S3.
|
||||
|
||||
The unit is # of bytes.
|
||||
|
||||
#### checkpoint_timeout
|
||||
|
||||
Apart from `checkpoint_distance`, open layer flushing is also triggered
|
||||
`checkpoint_timeout` after the last flush. This makes WAL eventually uploaded to
|
||||
s3 when activity is stopped.
|
||||
|
||||
The default is 10m.
|
||||
|
||||
#### compaction_period
|
||||
|
||||
Every `compaction_period` seconds, the page server checks if
|
||||
|
||||
@@ -13,24 +13,30 @@ use super::xlog_utils::*;
|
||||
use super::XLogLongPageHeaderData;
|
||||
use super::XLogPageHeaderData;
|
||||
use super::XLogRecord;
|
||||
use super::XLOG_PAGE_MAGIC;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use crc32c::*;
|
||||
use log::*;
|
||||
use std::cmp::min;
|
||||
use std::num::NonZeroU32;
|
||||
use thiserror::Error;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
enum State {
|
||||
WaitingForRecord,
|
||||
ReassemblingRecord {
|
||||
recordbuf: BytesMut,
|
||||
contlen: NonZeroU32,
|
||||
},
|
||||
SkippingEverything {
|
||||
skip_until_lsn: Lsn,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct WalStreamDecoder {
|
||||
lsn: Lsn,
|
||||
|
||||
startlsn: Lsn, // LSN where this record starts
|
||||
contlen: u32,
|
||||
padlen: u32,
|
||||
|
||||
inputbuf: BytesMut,
|
||||
|
||||
/// buffer used to reassemble records that cross page boundaries.
|
||||
recordbuf: BytesMut,
|
||||
state: State,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug, Clone)]
|
||||
@@ -48,13 +54,8 @@ impl WalStreamDecoder {
|
||||
pub fn new(lsn: Lsn) -> WalStreamDecoder {
|
||||
WalStreamDecoder {
|
||||
lsn,
|
||||
|
||||
startlsn: Lsn(0),
|
||||
contlen: 0,
|
||||
padlen: 0,
|
||||
|
||||
inputbuf: BytesMut::new(),
|
||||
recordbuf: BytesMut::new(),
|
||||
state: State::WaitingForRecord,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,6 +68,58 @@ impl WalStreamDecoder {
|
||||
self.inputbuf.extend_from_slice(buf);
|
||||
}
|
||||
|
||||
fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
|
||||
let validate_impl = || {
|
||||
if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
|
||||
return Err(format!(
|
||||
"invalid xlog page header: xlp_magic={}, expected {}",
|
||||
hdr.xlp_magic, XLOG_PAGE_MAGIC
|
||||
));
|
||||
}
|
||||
if hdr.xlp_pageaddr != self.lsn.0 {
|
||||
return Err(format!(
|
||||
"invalid xlog page header: xlp_pageaddr={}, expected {}",
|
||||
hdr.xlp_pageaddr, self.lsn
|
||||
));
|
||||
}
|
||||
match self.state {
|
||||
State::WaitingForRecord => {
|
||||
if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
|
||||
return Err(
|
||||
"invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
|
||||
);
|
||||
}
|
||||
if hdr.xlp_rem_len != 0 {
|
||||
return Err(format!(
|
||||
"invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
|
||||
hdr.xlp_rem_len
|
||||
));
|
||||
}
|
||||
}
|
||||
State::ReassemblingRecord { contlen, .. } => {
|
||||
if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
|
||||
return Err(
|
||||
"invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
if hdr.xlp_rem_len != contlen.get() {
|
||||
return Err(format!(
|
||||
"invalid xlog page header: xlp_rem_len={}, expected {}",
|
||||
hdr.xlp_rem_len,
|
||||
contlen.get()
|
||||
));
|
||||
}
|
||||
}
|
||||
State::SkippingEverything { .. } => {
|
||||
panic!("Should not be validating page header in the SkippingEverything state");
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
};
|
||||
validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
|
||||
}
|
||||
|
||||
/// Attempt to decode another WAL record from the input that has been fed to the
|
||||
/// decoder so far.
|
||||
///
|
||||
@@ -76,128 +129,121 @@ impl WalStreamDecoder {
|
||||
/// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
|
||||
///
|
||||
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
|
||||
let recordbuf;
|
||||
|
||||
// Run state machine that validates page headers, and reassembles records
|
||||
// that cross page boundaries.
|
||||
loop {
|
||||
// parse and verify page boundaries as we go
|
||||
if self.padlen > 0 {
|
||||
// We should first skip padding, as we may have to skip some page headers if we're processing the XLOG_SWITCH record.
|
||||
if self.inputbuf.remaining() < self.padlen as usize {
|
||||
return Ok(None);
|
||||
}
|
||||
// However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
|
||||
match self.state {
|
||||
State::WaitingForRecord | State::ReassemblingRecord { .. } => {
|
||||
if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 {
|
||||
// parse long header
|
||||
|
||||
// skip padding
|
||||
self.inputbuf.advance(self.padlen as usize);
|
||||
self.lsn += self.padlen as u64;
|
||||
self.padlen = 0;
|
||||
} else if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 {
|
||||
// parse long header
|
||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
|
||||
return Ok(None);
|
||||
}
|
||||
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
|
||||
|e| WalDecodeError {
|
||||
msg: format!("long header deserialization failed {}", e),
|
||||
lsn: self.lsn,
|
||||
},
|
||||
)?;
|
||||
|
||||
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
|
||||
WalDecodeError {
|
||||
msg: format!("long header deserialization failed {}", e),
|
||||
lsn: self.lsn,
|
||||
self.validate_page_header(&hdr.std)?;
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
|
||||
} else if self.lsn.block_offset() == 0 {
|
||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let hdr =
|
||||
XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
|
||||
WalDecodeError {
|
||||
msg: format!("header deserialization failed {}", e),
|
||||
lsn: self.lsn,
|
||||
}
|
||||
})?;
|
||||
|
||||
self.validate_page_header(&hdr)?;
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
|
||||
}
|
||||
})?;
|
||||
|
||||
if hdr.std.xlp_pageaddr != self.lsn.0 {
|
||||
return Err(WalDecodeError {
|
||||
msg: "invalid xlog segment header".into(),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
// TODO: verify the remaining fields in the header
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
|
||||
continue;
|
||||
} else if self.lsn.block_offset() == 0 {
|
||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let hdr = XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
|
||||
WalDecodeError {
|
||||
msg: format!("header deserialization failed {}", e),
|
||||
lsn: self.lsn,
|
||||
State::SkippingEverything { .. } => {}
|
||||
}
|
||||
match &mut self.state {
|
||||
State::WaitingForRecord => {
|
||||
// need to have at least the xl_tot_len field
|
||||
if self.inputbuf.remaining() < 4 {
|
||||
return Ok(None);
|
||||
}
|
||||
})?;
|
||||
|
||||
if hdr.xlp_pageaddr != self.lsn.0 {
|
||||
return Err(WalDecodeError {
|
||||
msg: "invalid xlog page header".into(),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
// peek xl_tot_len at the beginning of the record.
|
||||
// FIXME: assumes little-endian
|
||||
let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
|
||||
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
|
||||
return Err(WalDecodeError {
|
||||
msg: format!("invalid xl_tot_len {}", xl_tot_len),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
// Fast path for the common case that the whole record fits on the page.
|
||||
let pageleft = self.lsn.remaining_in_block() as u32;
|
||||
if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
|
||||
self.lsn += xl_tot_len as u64;
|
||||
let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
|
||||
return Ok(Some(self.complete_record(recordbuf)?));
|
||||
} else {
|
||||
// Need to assemble the record from pieces. Remember the size of the
|
||||
// record, and loop back. On next iteration, we will reach the 'else'
|
||||
// branch below, and copy the part of the record that was on this page
|
||||
// to 'recordbuf'. Subsequent iterations will skip page headers, and
|
||||
// append the continuations from the next pages to 'recordbuf'.
|
||||
self.state = State::ReassemblingRecord {
|
||||
recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
|
||||
contlen: NonZeroU32::new(xl_tot_len).unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO: verify the remaining fields in the header
|
||||
State::ReassemblingRecord { recordbuf, contlen } => {
|
||||
// we're continuing a record, possibly from previous page.
|
||||
let pageleft = self.lsn.remaining_in_block() as u32;
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
|
||||
continue;
|
||||
} else if self.contlen == 0 {
|
||||
assert!(self.recordbuf.is_empty());
|
||||
// read the rest of the record, or as much as fits on this page.
|
||||
let n = min(contlen.get(), pageleft) as usize;
|
||||
|
||||
// need to have at least the xl_tot_len field
|
||||
if self.inputbuf.remaining() < 4 {
|
||||
return Ok(None);
|
||||
if self.inputbuf.remaining() < n {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
recordbuf.put(self.inputbuf.split_to(n));
|
||||
self.lsn += n as u64;
|
||||
*contlen = match NonZeroU32::new(contlen.get() - n as u32) {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
// The record is now complete.
|
||||
let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
|
||||
return Ok(Some(self.complete_record(recordbuf)?));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// peek xl_tot_len at the beginning of the record.
|
||||
// FIXME: assumes little-endian
|
||||
self.startlsn = self.lsn;
|
||||
let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
|
||||
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
|
||||
return Err(WalDecodeError {
|
||||
msg: format!("invalid xl_tot_len {}", xl_tot_len),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
State::SkippingEverything { skip_until_lsn } => {
|
||||
assert!(*skip_until_lsn >= self.lsn);
|
||||
let n = skip_until_lsn.0 - self.lsn.0;
|
||||
if self.inputbuf.remaining() < n as usize {
|
||||
return Ok(None);
|
||||
}
|
||||
self.inputbuf.advance(n as usize);
|
||||
self.lsn += n;
|
||||
self.state = State::WaitingForRecord;
|
||||
}
|
||||
|
||||
// Fast path for the common case that the whole record fits on the page.
|
||||
let pageleft = self.lsn.remaining_in_block() as u32;
|
||||
if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
|
||||
// Take the record from the 'inputbuf', and validate it.
|
||||
recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
|
||||
self.lsn += xl_tot_len as u64;
|
||||
break;
|
||||
} else {
|
||||
// Need to assemble the record from pieces. Remember the size of the
|
||||
// record, and loop back. On next iteration, we will reach the 'else'
|
||||
// branch below, and copy the part of the record that was on this page
|
||||
// to 'recordbuf'. Subsequent iterations will skip page headers, and
|
||||
// append the continuations from the next pages to 'recordbuf'.
|
||||
self.recordbuf.reserve(xl_tot_len as usize);
|
||||
self.contlen = xl_tot_len;
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// we're continuing a record, possibly from previous page.
|
||||
let pageleft = self.lsn.remaining_in_block() as u32;
|
||||
|
||||
// read the rest of the record, or as much as fits on this page.
|
||||
let n = min(self.contlen, pageleft) as usize;
|
||||
|
||||
if self.inputbuf.remaining() < n {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
self.recordbuf.put(self.inputbuf.split_to(n));
|
||||
self.lsn += n as u64;
|
||||
self.contlen -= n as u32;
|
||||
|
||||
if self.contlen == 0 {
|
||||
// The record is now complete.
|
||||
recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()).freeze();
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
|
||||
// We now have a record in the 'recordbuf' local variable.
|
||||
let xlogrec =
|
||||
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
|
||||
@@ -219,18 +265,20 @@ impl WalStreamDecoder {
|
||||
|
||||
// XLOG_SWITCH records are special. If we see one, we need to skip
|
||||
// to the next WAL segment.
|
||||
if xlogrec.is_xlog_switch_record() {
|
||||
let next_lsn = if xlogrec.is_xlog_switch_record() {
|
||||
trace!("saw xlog switch record at {}", self.lsn);
|
||||
self.padlen = self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64) as u32;
|
||||
self.lsn + self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64)
|
||||
} else {
|
||||
// Pad to an 8-byte boundary
|
||||
self.padlen = self.lsn.calc_padding(8u32) as u32;
|
||||
}
|
||||
self.lsn.align()
|
||||
};
|
||||
self.state = State::SkippingEverything {
|
||||
skip_until_lsn: next_lsn,
|
||||
};
|
||||
|
||||
// We should return LSN of the next record, not the last byte of this record or
|
||||
// the byte immediately after. Note that this handles both XLOG_SWITCH and usual
|
||||
// records, the former "spans" until the next WAL segment (see test_xlog_switch).
|
||||
let result = (self.lsn + self.padlen as u64, recordbuf);
|
||||
Ok(Some(result))
|
||||
Ok((next_lsn, recordbuf))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,6 +59,7 @@ pub mod defaults {
|
||||
|
||||
# [tenant_config]
|
||||
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
|
||||
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
|
||||
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
|
||||
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
|
||||
#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}'
|
||||
@@ -452,6 +453,13 @@ impl PageServerConf {
|
||||
Some(parse_toml_u64("checkpoint_distance", checkpoint_distance)?);
|
||||
}
|
||||
|
||||
if let Some(checkpoint_timeout) = item.get("checkpoint_timeout") {
|
||||
t_conf.checkpoint_timeout = Some(parse_toml_duration(
|
||||
"checkpoint_timeout",
|
||||
checkpoint_timeout,
|
||||
)?);
|
||||
}
|
||||
|
||||
if let Some(compaction_target_size) = item.get("compaction_target_size") {
|
||||
t_conf.compaction_target_size = Some(parse_toml_u64(
|
||||
"compaction_target_size",
|
||||
|
||||
@@ -32,6 +32,7 @@ pub struct TenantCreateRequest {
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub new_tenant_id: Option<ZTenantId>,
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
pub compaction_period: Option<String>,
|
||||
pub compaction_threshold: Option<usize>,
|
||||
@@ -70,6 +71,7 @@ pub struct TenantConfigRequest {
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
pub compaction_period: Option<String>,
|
||||
pub compaction_threshold: Option<usize>,
|
||||
@@ -87,6 +89,7 @@ impl TenantConfigRequest {
|
||||
TenantConfigRequest {
|
||||
tenant_id,
|
||||
checkpoint_distance: None,
|
||||
checkpoint_timeout: None,
|
||||
compaction_target_size: None,
|
||||
compaction_period: None,
|
||||
compaction_threshold: None,
|
||||
|
||||
@@ -560,6 +560,8 @@ components:
|
||||
type: string
|
||||
checkpoint_distance:
|
||||
type: integer
|
||||
checkpoint_timeout:
|
||||
type: string
|
||||
compaction_period:
|
||||
type: string
|
||||
compaction_threshold:
|
||||
@@ -578,6 +580,8 @@ components:
|
||||
type: string
|
||||
checkpoint_distance:
|
||||
type: integer
|
||||
checkpoint_timeout:
|
||||
type: string
|
||||
compaction_period:
|
||||
type: string
|
||||
compaction_threshold:
|
||||
|
||||
@@ -623,6 +623,11 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
}
|
||||
|
||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
||||
tenant_conf.checkpoint_timeout =
|
||||
Some(humantime::parse_duration(&checkpoint_timeout).map_err(ApiError::from_err)?);
|
||||
}
|
||||
|
||||
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
||||
tenant_conf.compaction_threshold = request_data.compaction_threshold;
|
||||
|
||||
@@ -683,6 +688,10 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
}
|
||||
|
||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
||||
tenant_conf.checkpoint_timeout =
|
||||
Some(humantime::parse_duration(&checkpoint_timeout).map_err(ApiError::from_err)?);
|
||||
}
|
||||
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
||||
tenant_conf.compaction_threshold = request_data.compaction_threshold;
|
||||
|
||||
|
||||
@@ -433,6 +433,13 @@ impl LayeredRepository {
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
|
||||
}
|
||||
|
||||
pub fn get_checkpoint_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
pub fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::ops::{Deref, Range};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{self, AtomicBool, AtomicIsize, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use metrics::{
|
||||
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec,
|
||||
@@ -233,6 +233,8 @@ pub struct LayeredTimeline {
|
||||
pub layers: RwLock<LayerMap>,
|
||||
|
||||
last_freeze_at: AtomicLsn,
|
||||
// Atomic would be more appropriate here.
|
||||
last_freeze_ts: RwLock<Instant>,
|
||||
|
||||
// WAL redo manager
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Sync + Send>,
|
||||
@@ -560,6 +562,13 @@ impl LayeredTimeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
|
||||
}
|
||||
|
||||
fn get_checkpoint_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
@@ -649,6 +658,7 @@ impl LayeredTimeline {
|
||||
disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn().0),
|
||||
|
||||
last_freeze_at: AtomicLsn::new(metadata.disk_consistent_lsn().0),
|
||||
last_freeze_ts: RwLock::new(Instant::now()),
|
||||
|
||||
ancestor_timeline: ancestor,
|
||||
ancestor_lsn: metadata.ancestor_lsn(),
|
||||
@@ -1094,8 +1104,11 @@ impl LayeredTimeline {
|
||||
}
|
||||
|
||||
///
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated
|
||||
/// in the in-memory layer, and initiate flushing it if so.
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||
/// the in-memory layer, and initiate flushing it if so.
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
///
|
||||
pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
@@ -1103,21 +1116,27 @@ impl LayeredTimeline {
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
let distance = last_lsn.widening_sub(self.last_freeze_at.load());
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
|
||||
let distance = last_lsn.widening_sub(last_freeze_at);
|
||||
// Checkpointing the open layer can be triggered by layer size or LSN range.
|
||||
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
|
||||
// we want to stay below that with a big margin. The LSN distance determines how
|
||||
// much WAL the safekeepers need to store.
|
||||
if distance >= self.get_checkpoint_distance().into()
|
||||
|| open_layer_size > self.get_checkpoint_distance()
|
||||
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
|
||||
{
|
||||
info!(
|
||||
"check_checkpoint_distance {}, layer size {}",
|
||||
distance, open_layer_size
|
||||
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
|
||||
distance,
|
||||
open_layer_size,
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true);
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
// Launch a thread to flush the frozen layer to disk, unless
|
||||
// a thread was already running. (If the thread was running
|
||||
|
||||
@@ -93,3 +93,56 @@ pub fn shutdown_pageserver(exit_code: i32) {
|
||||
info!("Shut down successfully completed");
|
||||
std::process::exit(exit_code);
|
||||
}
|
||||
|
||||
const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
|
||||
const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
|
||||
|
||||
async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) {
|
||||
let backoff_duration_seconds =
|
||||
exponential_backoff_duration_seconds(n, base_increment, max_seconds);
|
||||
if backoff_duration_seconds > 0.0 {
|
||||
info!(
|
||||
"Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
|
||||
if n == 0 {
|
||||
0.0
|
||||
} else {
|
||||
(1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod backoff_defaults_tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn backoff_defaults_produce_growing_backoff_sequence() {
|
||||
let mut current_backoff_value = None;
|
||||
|
||||
for i in 0..10_000 {
|
||||
let new_backoff_value = exponential_backoff_duration_seconds(
|
||||
i,
|
||||
DEFAULT_BASE_BACKOFF_SECONDS,
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
);
|
||||
|
||||
if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
|
||||
assert!(
|
||||
old_backoff_value <= new_backoff_value,
|
||||
"{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
current_backoff_value.expect("Should have produced backoff values to compare"),
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
"Given big enough of retries, backoff should reach its allowed max value"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1044,6 +1044,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"checkpoint_distance"),
|
||||
RowDescriptor::int8_col(b"checkpoint_timeout"),
|
||||
RowDescriptor::int8_col(b"compaction_target_size"),
|
||||
RowDescriptor::int8_col(b"compaction_period"),
|
||||
RowDescriptor::int8_col(b"compaction_threshold"),
|
||||
@@ -1054,6 +1055,12 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[
|
||||
Some(repo.get_checkpoint_distance().to_string().as_bytes()),
|
||||
Some(
|
||||
repo.get_checkpoint_timeout()
|
||||
.as_secs()
|
||||
.to_string()
|
||||
.as_bytes(),
|
||||
),
|
||||
Some(repo.get_compaction_target_size().to_string().as_bytes()),
|
||||
Some(
|
||||
repo.get_compaction_period()
|
||||
|
||||
@@ -708,20 +708,25 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
|
||||
/// Truncate relation
|
||||
pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
|
||||
ensure!(rel.relnode != 0, "invalid relnode");
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let last_lsn = self.tline.get_last_record_lsn();
|
||||
if self.tline.get_rel_exists(rel, last_lsn)? {
|
||||
let size_key = rel_size_to_key(rel);
|
||||
// Fetch the old size first
|
||||
let old_size = self.get(size_key)?.get_u32_le();
|
||||
|
||||
// Fetch the old size first
|
||||
let old_size = self.get(size_key)?.get_u32_le();
|
||||
// Update the entry with the new size.
|
||||
let buf = nblocks.to_le_bytes();
|
||||
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
||||
|
||||
// Update the entry with the new size.
|
||||
let buf = nblocks.to_le_bytes();
|
||||
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
||||
// Update relation size cache
|
||||
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
|
||||
|
||||
// Update relation size cache
|
||||
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
|
||||
// Update relation size cache
|
||||
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
|
||||
|
||||
// Update logical database size.
|
||||
self.pending_nblocks -= old_size as isize - nblocks as isize;
|
||||
// Update logical database size.
|
||||
self.pending_nblocks -= old_size as isize - nblocks as isize;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -961,8 +966,8 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
|
||||
bail!("unexpected pending WAL record");
|
||||
}
|
||||
} else {
|
||||
let last_lsn = self.tline.get_last_record_lsn();
|
||||
self.tline.get(key, last_lsn)
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
self.tline.get(key, lsn)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -445,6 +445,7 @@ pub mod repo_harness {
|
||||
fn from(tenant_conf: TenantConf) -> Self {
|
||||
Self {
|
||||
checkpoint_distance: Some(tenant_conf.checkpoint_distance),
|
||||
checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
|
||||
compaction_target_size: Some(tenant_conf.compaction_target_size),
|
||||
compaction_period: Some(tenant_conf.compaction_period),
|
||||
compaction_threshold: Some(tenant_conf.compaction_threshold),
|
||||
|
||||
@@ -172,6 +172,7 @@ use self::{
|
||||
};
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
exponential_backoff,
|
||||
layered_repository::{
|
||||
ephemeral_file::is_ephemeral_file,
|
||||
metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME},
|
||||
@@ -969,14 +970,19 @@ fn storage_sync_loop<P, S>(
|
||||
}
|
||||
}
|
||||
|
||||
// needed to check whether the download happened
|
||||
// more informative than just a bool
|
||||
#[derive(Debug)]
|
||||
enum DownloadMarker {
|
||||
enum DownloadStatus {
|
||||
Downloaded,
|
||||
Nothing,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum UploadStatus {
|
||||
Uploaded,
|
||||
Failed,
|
||||
Nothing,
|
||||
}
|
||||
|
||||
async fn process_batches<P, S>(
|
||||
conf: &'static PageServerConf,
|
||||
max_sync_errors: NonZeroU32,
|
||||
@@ -1016,7 +1022,7 @@ where
|
||||
"Finished storage sync task for sync id {sync_id} download marker {:?}",
|
||||
download_marker
|
||||
);
|
||||
if matches!(download_marker, DownloadMarker::Downloaded) {
|
||||
if matches!(download_marker, DownloadStatus::Downloaded) {
|
||||
downloaded_timelines.insert(sync_id.tenant_id);
|
||||
}
|
||||
}
|
||||
@@ -1030,7 +1036,7 @@ async fn process_sync_task_batch<P, S>(
|
||||
max_sync_errors: NonZeroU32,
|
||||
sync_id: ZTenantTimelineId,
|
||||
batch: SyncTaskBatch,
|
||||
) -> DownloadMarker
|
||||
) -> DownloadStatus
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
@@ -1047,7 +1053,7 @@ where
|
||||
// When operating in a system without tasks failing over the error threshold,
|
||||
// current batching and task processing systems aim to update the layer set and metadata files (remote and local),
|
||||
// without "losing" such layer files.
|
||||
let (upload_result, status_update) = tokio::join!(
|
||||
let (upload_status, download_status) = tokio::join!(
|
||||
async {
|
||||
if let Some(upload_data) = upload_data {
|
||||
match validate_task_retries(upload_data, max_sync_errors)
|
||||
@@ -1065,7 +1071,7 @@ where
|
||||
"upload",
|
||||
)
|
||||
.await;
|
||||
return Some(());
|
||||
UploadStatus::Uploaded
|
||||
}
|
||||
ControlFlow::Break(failed_upload_data) => {
|
||||
if let Err(e) = update_remote_data(
|
||||
@@ -1082,10 +1088,13 @@ where
|
||||
{
|
||||
error!("Failed to update remote timeline {sync_id}: {e:?}");
|
||||
}
|
||||
|
||||
UploadStatus::Failed
|
||||
}
|
||||
}
|
||||
} else {
|
||||
UploadStatus::Nothing
|
||||
}
|
||||
None
|
||||
}
|
||||
.instrument(info_span!("upload_timeline_data")),
|
||||
async {
|
||||
@@ -1115,51 +1124,53 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
DownloadMarker::Nothing
|
||||
DownloadStatus::Nothing
|
||||
}
|
||||
.instrument(info_span!("download_timeline_data")),
|
||||
);
|
||||
|
||||
if let Some(mut delete_data) = batch.delete {
|
||||
if upload_result.is_some() {
|
||||
match validate_task_retries(delete_data, max_sync_errors)
|
||||
.instrument(info_span!("retries_validation"))
|
||||
.await
|
||||
{
|
||||
ControlFlow::Continue(new_delete_data) => {
|
||||
delete_timeline_data(
|
||||
conf,
|
||||
(storage.as_ref(), &index, sync_queue),
|
||||
sync_id,
|
||||
new_delete_data,
|
||||
sync_start,
|
||||
"delete",
|
||||
)
|
||||
.instrument(info_span!("delete_timeline_data"))
|
||||
.await;
|
||||
}
|
||||
ControlFlow::Break(failed_delete_data) => {
|
||||
if let Err(e) = update_remote_data(
|
||||
conf,
|
||||
storage.as_ref(),
|
||||
&index,
|
||||
sync_id,
|
||||
RemoteDataUpdate::Delete(&failed_delete_data.data.deleted_layers),
|
||||
)
|
||||
if let Some(delete_data) = batch.delete {
|
||||
match upload_status {
|
||||
UploadStatus::Uploaded | UploadStatus::Nothing => {
|
||||
match validate_task_retries(delete_data, max_sync_errors)
|
||||
.instrument(info_span!("retries_validation"))
|
||||
.await
|
||||
{
|
||||
error!("Failed to update remote timeline {sync_id}: {e:?}");
|
||||
{
|
||||
ControlFlow::Continue(new_delete_data) => {
|
||||
delete_timeline_data(
|
||||
conf,
|
||||
(storage.as_ref(), &index, sync_queue),
|
||||
sync_id,
|
||||
new_delete_data,
|
||||
sync_start,
|
||||
"delete",
|
||||
)
|
||||
.instrument(info_span!("delete_timeline_data"))
|
||||
.await;
|
||||
}
|
||||
ControlFlow::Break(failed_delete_data) => {
|
||||
if let Err(e) = update_remote_data(
|
||||
conf,
|
||||
storage.as_ref(),
|
||||
&index,
|
||||
sync_id,
|
||||
RemoteDataUpdate::Delete(&failed_delete_data.data.deleted_layers),
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("Failed to update remote timeline {sync_id}: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
delete_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
warn!("Skipping delete task due to failed upload tasks, reenqueuing");
|
||||
UploadStatus::Failed => {
|
||||
warn!("Skipping delete task due to failed upload tasks, reenqueuing");
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
status_update
|
||||
download_status
|
||||
}
|
||||
|
||||
async fn download_timeline_data<P, S>(
|
||||
@@ -1170,7 +1181,7 @@ async fn download_timeline_data<P, S>(
|
||||
new_download_data: SyncData<LayersDownload>,
|
||||
sync_start: Instant,
|
||||
task_name: &str,
|
||||
) -> DownloadMarker
|
||||
) -> DownloadStatus
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
@@ -1199,7 +1210,7 @@ where
|
||||
Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) {
|
||||
Ok(()) => {
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(true));
|
||||
return DownloadMarker::Downloaded;
|
||||
return DownloadStatus::Downloaded;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Timeline {sync_id} was expected to be in the remote index after a successful download, but it's absent: {e:?}");
|
||||
@@ -1215,7 +1226,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
DownloadMarker::Nothing
|
||||
DownloadStatus::Nothing
|
||||
}
|
||||
|
||||
async fn update_local_metadata(
|
||||
@@ -1493,11 +1504,7 @@ async fn validate_task_retries<T>(
|
||||
return ControlFlow::Break(sync_data);
|
||||
}
|
||||
|
||||
if current_attempt > 0 {
|
||||
let seconds_to_wait = 2.0_f64.powf(current_attempt as f64 - 1.0).min(30.0);
|
||||
info!("Waiting {seconds_to_wait} seconds before starting the task");
|
||||
tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
|
||||
}
|
||||
exponential_backoff(current_attempt, 1.0, 30.0).await;
|
||||
ControlFlow::Continue(sync_data)
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ pub mod defaults {
|
||||
// which is good for now to trigger bugs.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||
pub const DEFAULT_CHECKPOINT_TIMEOUT: &str = "10 m";
|
||||
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
@@ -48,6 +49,9 @@ pub struct TenantConf {
|
||||
// page server crashes.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub checkpoint_distance: u64,
|
||||
// Inmemory layer is also flushed at least once in checkpoint_timeout to
|
||||
// eventually upload WAL after activity is stopped.
|
||||
pub checkpoint_timeout: Duration,
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub compaction_target_size: u64,
|
||||
@@ -90,6 +94,7 @@ pub struct TenantConf {
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct TenantConfOpt {
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub checkpoint_timeout: Option<Duration>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub compaction_period: Option<Duration>,
|
||||
@@ -113,6 +118,9 @@ impl TenantConfOpt {
|
||||
checkpoint_distance: self
|
||||
.checkpoint_distance
|
||||
.unwrap_or(global_conf.checkpoint_distance),
|
||||
checkpoint_timeout: self
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(global_conf.checkpoint_timeout),
|
||||
compaction_target_size: self
|
||||
.compaction_target_size
|
||||
.unwrap_or(global_conf.compaction_target_size),
|
||||
@@ -142,6 +150,9 @@ impl TenantConfOpt {
|
||||
if let Some(checkpoint_distance) = other.checkpoint_distance {
|
||||
self.checkpoint_distance = Some(checkpoint_distance);
|
||||
}
|
||||
if let Some(checkpoint_timeout) = other.checkpoint_timeout {
|
||||
self.checkpoint_timeout = Some(checkpoint_timeout);
|
||||
}
|
||||
if let Some(compaction_target_size) = other.compaction_target_size {
|
||||
self.compaction_target_size = Some(compaction_target_size);
|
||||
}
|
||||
@@ -181,6 +192,8 @@ impl TenantConf {
|
||||
|
||||
TenantConf {
|
||||
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_timeout: humantime::parse_duration(DEFAULT_CHECKPOINT_TIMEOUT)
|
||||
.expect("cannot parse default checkpoint timeout"),
|
||||
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
|
||||
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
|
||||
.expect("cannot parse default compaction period"),
|
||||
@@ -212,6 +225,7 @@ impl TenantConf {
|
||||
pub fn dummy_conf() -> Self {
|
||||
TenantConf {
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_timeout: Duration::from_secs(600),
|
||||
compaction_target_size: 4 * 1024 * 1024,
|
||||
compaction_period: Duration::from_secs(10),
|
||||
compaction_threshold: defaults::DEFAULT_COMPACTION_THRESHOLD,
|
||||
|
||||
@@ -232,7 +232,7 @@ pub(crate) fn create_timeline(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let _new_timeline = match ancestor_timeline_id {
|
||||
match ancestor_timeline_id {
|
||||
Some(ancestor_timeline_id) => {
|
||||
let ancestor_timeline = repo
|
||||
.get_timeline_load(ancestor_timeline_id)
|
||||
|
||||
@@ -25,7 +25,11 @@ use etcd_broker::{
|
||||
use tokio::select;
|
||||
use tracing::*;
|
||||
|
||||
use crate::repository::{Repository, Timeline};
|
||||
use crate::{
|
||||
exponential_backoff,
|
||||
repository::{Repository, Timeline},
|
||||
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use crate::{RepositoryImpl, TimelineImpl};
|
||||
use utils::{
|
||||
lsn::Lsn,
|
||||
@@ -230,18 +234,6 @@ async fn subscribe_for_timeline_updates(
|
||||
}
|
||||
}
|
||||
|
||||
const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
|
||||
const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
|
||||
|
||||
async fn exponential_backoff(n: u32, base: f64, max_seconds: f64) {
|
||||
if n == 0 {
|
||||
return;
|
||||
}
|
||||
let seconds_to_wait = base.powf(f64::from(n) - 1.0).min(max_seconds);
|
||||
info!("Backoff: waiting {seconds_to_wait} seconds before proceeding with the task");
|
||||
tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
|
||||
}
|
||||
|
||||
/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
|
||||
struct WalreceiverState {
|
||||
id: ZTenantTimelineId,
|
||||
|
||||
@@ -178,16 +178,6 @@ pub async fn handle_walreceiver_connection(
|
||||
caught_up = true;
|
||||
}
|
||||
|
||||
let timeline_to_check = Arc::clone(&timeline);
|
||||
tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance())
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Spawned checkpoint check task panicked for timeline {id}")
|
||||
})?
|
||||
.with_context(|| {
|
||||
format!("Failed to check checkpoint distance for timeline {id}")
|
||||
})?;
|
||||
|
||||
Some(endlsn)
|
||||
}
|
||||
|
||||
@@ -208,6 +198,12 @@ pub async fn handle_walreceiver_connection(
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let timeline_to_check = Arc::clone(&timeline);
|
||||
tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance())
|
||||
.await
|
||||
.with_context(|| format!("Spawned checkpoint check task panicked for timeline {id}"))?
|
||||
.with_context(|| format!("Failed to check checkpoint distance for timeline {id}"))?;
|
||||
|
||||
if let Some(last_lsn) = status_update {
|
||||
let remote_index = repo.get_remote_index();
|
||||
let timeline_remote_consistent_lsn = remote_index
|
||||
|
||||
@@ -727,7 +727,7 @@ where
|
||||
info!("setting local_start_lsn to {:?}", state.local_start_lsn);
|
||||
}
|
||||
// Initializing commit_lsn before acking first flushed record is
|
||||
// important to let find_end_of_wal skip the whole in the beginning
|
||||
// important to let find_end_of_wal skip the hole in the beginning
|
||||
// of the first segment.
|
||||
//
|
||||
// NB: on new clusters, this happens at the same time as
|
||||
@@ -738,6 +738,10 @@ where
|
||||
|
||||
// Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
|
||||
self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
|
||||
// Initializing remote_consistent_lsn sets that we have nothing to
|
||||
// stream to pageserver(s) immediately after creation.
|
||||
self.inmem.remote_consistent_lsn =
|
||||
max(self.inmem.remote_consistent_lsn, state.timeline_start_lsn);
|
||||
|
||||
state.acceptor_state.term_history = msg.term_history.clone();
|
||||
self.persist_control_file(state)?;
|
||||
|
||||
@@ -137,7 +137,7 @@ impl SharedState {
|
||||
self.is_wal_backup_required()
|
||||
// FIXME: add tracking of relevant pageservers and check them here individually,
|
||||
// otherwise migration won't work (we suspend too early).
|
||||
|| self.sk.inmem.remote_consistent_lsn <= self.sk.inmem.commit_lsn
|
||||
|| self.sk.inmem.remote_consistent_lsn < self.sk.inmem.commit_lsn
|
||||
}
|
||||
|
||||
/// Mark timeline active/inactive and return whether s3 offloading requires
|
||||
|
||||
11
test_runner/batch_others/test_fsm_truncate.py
Normal file
11
test_runner/batch_others/test_fsm_truncate.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverHttpClient
|
||||
import pytest
|
||||
|
||||
|
||||
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_fsm_truncate")
|
||||
pg = env.postgres.create_start('test_fsm_truncate')
|
||||
pg.safe_psql(
|
||||
'CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;')
|
||||
@@ -1,9 +1,10 @@
|
||||
import re
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_upload, wait_for_last_record_lsn
|
||||
from fixtures.utils import lsn_from_hex, lsn_to_hex
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, Postgres, wait_for_upload, wait_for_last_record_lsn
|
||||
from fixtures.utils import lsn_from_hex
|
||||
from uuid import UUID, uuid4
|
||||
import tarfile
|
||||
import os
|
||||
import tarfile
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
import json
|
||||
@@ -105,20 +106,63 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
|
||||
|
||||
num_rows = 3000
|
||||
def test_import_from_pageserver_small(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch('test_import_from_pageserver')
|
||||
pgmain = env.postgres.create_start('test_import_from_pageserver')
|
||||
log.info("postgres is running on 'test_import_from_pageserver' branch")
|
||||
timeline = env.neon_cli.create_branch('test_import_from_pageserver_small')
|
||||
pg = env.postgres.create_start('test_import_from_pageserver_small')
|
||||
|
||||
timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||
num_rows = 3000
|
||||
lsn = _generate_data(num_rows, pg)
|
||||
_import(num_rows, lsn, env, pg_bin, timeline)
|
||||
|
||||
with closing(pgmain.connect()) as conn:
|
||||
|
||||
@pytest.mark.timeout(1800)
|
||||
# TODO: temporarily disable `test_import_from_pageserver_multisegment` test, enable
|
||||
# the test back after finding the failure cause.
|
||||
# @pytest.mark.skipif(os.environ.get('BUILD_TYPE') == "debug", reason="only run with release build")
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/2255")
|
||||
def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
timeline = env.neon_cli.create_branch('test_import_from_pageserver_multisegment')
|
||||
pg = env.postgres.create_start('test_import_from_pageserver_multisegment')
|
||||
|
||||
# For `test_import_from_pageserver_multisegment`, we want to make sure that the data
|
||||
# is large enough to create multi-segment files. Typically, a segment file's size is
|
||||
# at most 1GB. A large number of inserted rows (`30000000`) is used to increase the
|
||||
# DB size to above 1GB. Related: https://github.com/neondatabase/neon/issues/2097.
|
||||
num_rows = 30000000
|
||||
lsn = _generate_data(num_rows, pg)
|
||||
|
||||
logical_size = env.pageserver.http_client().timeline_detail(
|
||||
env.initial_tenant, timeline)['local']['current_logical_size']
|
||||
log.info(f"timeline logical size = {logical_size / (1024 ** 2)}MB")
|
||||
assert logical_size > 1024**3 # = 1GB
|
||||
|
||||
tar_output_file = _import(num_rows, lsn, env, pg_bin, timeline)
|
||||
|
||||
# Check if the backup data contains multiple segment files
|
||||
cnt_seg_files = 0
|
||||
segfile_re = re.compile('[0-9]+\\.[0-9]+')
|
||||
with tarfile.open(tar_output_file, "r") as tar_f:
|
||||
for f in tar_f.getnames():
|
||||
if segfile_re.search(f) is not None:
|
||||
cnt_seg_files += 1
|
||||
log.info(f"Found a segment file: {f} in the backup archive file")
|
||||
assert cnt_seg_files > 0
|
||||
|
||||
|
||||
def _generate_data(num_rows: int, pg: Postgres) -> str:
|
||||
"""Generate a table with `num_rows` rows.
|
||||
|
||||
Returns:
|
||||
the latest insert WAL's LSN"""
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# data loading may take a while, so increase statement timeout
|
||||
cur.execute("SET statement_timeout='300s'")
|
||||
@@ -127,15 +171,28 @@ def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_bu
|
||||
cur.execute("CHECKPOINT")
|
||||
|
||||
cur.execute('SELECT pg_current_wal_insert_lsn()')
|
||||
lsn = cur.fetchone()[0]
|
||||
log.info(f"start_backup_lsn = {lsn}")
|
||||
res = cur.fetchone()
|
||||
assert res is not None and isinstance(res[0], str)
|
||||
return res[0]
|
||||
|
||||
|
||||
def _import(expected_num_rows: int, lsn: str, env: NeonEnv, pg_bin: PgBin, timeline: UUID) -> str:
|
||||
"""Test importing backup data to the pageserver.
|
||||
|
||||
Args:
|
||||
expected_num_rows: the expected number of rows of the test table in the backup data
|
||||
lsn: the backup's base LSN
|
||||
|
||||
Returns:
|
||||
path to the backup archive file"""
|
||||
log.info(f"start_backup_lsn = {lsn}")
|
||||
|
||||
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
||||
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
||||
psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')}
|
||||
|
||||
# Get a fullbackup from pageserver
|
||||
query = f"fullbackup { env.initial_tenant.hex} {timeline} {lsn}"
|
||||
query = f"fullbackup { env.initial_tenant.hex} {timeline.hex} {lsn}"
|
||||
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
|
||||
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
|
||||
tar_output_file = result_basepath + ".stdout"
|
||||
@@ -152,7 +209,7 @@ def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_bu
|
||||
env.pageserver.start()
|
||||
|
||||
# Import using another tenantid, because we use the same pageserver.
|
||||
# TODO Create another pageserver to maeke test more realistic.
|
||||
# TODO Create another pageserver to make test more realistic.
|
||||
tenant = uuid4()
|
||||
|
||||
# Import to pageserver
|
||||
@@ -165,7 +222,7 @@ def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_bu
|
||||
"--tenant-id",
|
||||
tenant.hex,
|
||||
"--timeline-id",
|
||||
timeline,
|
||||
timeline.hex,
|
||||
"--node-name",
|
||||
node_name,
|
||||
"--base-lsn",
|
||||
@@ -175,15 +232,15 @@ def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_bu
|
||||
])
|
||||
|
||||
# Wait for data to land in s3
|
||||
wait_for_last_record_lsn(client, tenant, UUID(timeline), lsn_from_hex(lsn))
|
||||
wait_for_upload(client, tenant, UUID(timeline), lsn_from_hex(lsn))
|
||||
wait_for_last_record_lsn(client, tenant, timeline, lsn_from_hex(lsn))
|
||||
wait_for_upload(client, tenant, timeline, lsn_from_hex(lsn))
|
||||
|
||||
# Check it worked
|
||||
pg = env.postgres.create_start(node_name, tenant_id=tenant)
|
||||
assert pg.safe_psql('select count(*) from tbl') == [(num_rows, )]
|
||||
assert pg.safe_psql('select count(*) from tbl') == [(expected_num_rows, )]
|
||||
|
||||
# Take another fullbackup
|
||||
query = f"fullbackup { tenant.hex} {timeline} {lsn}"
|
||||
query = f"fullbackup { tenant.hex} {timeline.hex} {lsn}"
|
||||
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
|
||||
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
|
||||
new_tar_output_file = result_basepath + ".stdout"
|
||||
@@ -195,4 +252,6 @@ def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_bu
|
||||
# Check that gc works
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
pscur.execute(f"do_gc {tenant.hex} {timeline} 0")
|
||||
pscur.execute(f"do_gc {tenant.hex} {timeline.hex} 0")
|
||||
|
||||
return tar_output_file
|
||||
|
||||
@@ -2,6 +2,16 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.log_helper import log
|
||||
|
||||
|
||||
# Test that the pageserver fixture is implemented correctly, allowing quick restarts.
|
||||
# This is a regression test, see https://github.com/neondatabase/neon/issues/2247
|
||||
def test_fixture_restart(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
for i in range(3):
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
|
||||
# Test restarting page server, while safekeeper and compute node keep
|
||||
# running.
|
||||
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -4,7 +4,7 @@ from uuid import UUID
|
||||
import re
|
||||
import psycopg2.extras
|
||||
import psycopg2.errors
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, assert_timeline_local
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, assert_timeline_local, wait_for_last_flush_lsn
|
||||
from fixtures.log_helper import log
|
||||
import time
|
||||
|
||||
@@ -192,6 +192,8 @@ def test_timeline_physical_size_init(neon_simple_env: NeonEnv):
|
||||
FROM generate_series(1, 1000) g""",
|
||||
])
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
|
||||
|
||||
# restart the pageserer to force calculating timeline's initial physical size
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
@@ -211,7 +213,9 @@ def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
|
||||
FROM generate_series(1, 1000) g""",
|
||||
])
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
|
||||
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
|
||||
assert_physical_size(env, env.initial_tenant, new_timeline_id)
|
||||
|
||||
|
||||
@@ -232,8 +236,10 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder
|
||||
FROM generate_series(1, 100000) g""",
|
||||
])
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
|
||||
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
env.pageserver.safe_psql(f"compact {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
|
||||
assert_physical_size(env, env.initial_tenant, new_timeline_id)
|
||||
|
||||
|
||||
@@ -254,15 +260,21 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g""",
|
||||
])
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
|
||||
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
|
||||
pg.safe_psql("""
|
||||
INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g
|
||||
""")
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
|
||||
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
|
||||
env.pageserver.safe_psql(f"do_gc {env.initial_tenant.hex} {new_timeline_id.hex} 0")
|
||||
|
||||
assert_physical_size(env, env.initial_tenant, new_timeline_id)
|
||||
|
||||
|
||||
@@ -279,6 +291,7 @@ def test_timeline_physical_size_metric(neon_simple_env: NeonEnv):
|
||||
FROM generate_series(1, 100000) g""",
|
||||
])
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
|
||||
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
|
||||
|
||||
# get the metrics and parse the metric for the current timeline's physical size
|
||||
@@ -319,6 +332,7 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv):
|
||||
f"INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, {n_rows}) g",
|
||||
])
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, tenant, timeline)
|
||||
env.pageserver.safe_psql(f"checkpoint {tenant.hex} {timeline.hex}")
|
||||
|
||||
timeline_total_size += get_timeline_physical_size(timeline)
|
||||
|
||||
@@ -284,9 +284,12 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
env.neon_cli.create_branch('test_safekeepers_wal_removal')
|
||||
pg = env.postgres.create_start('test_safekeepers_wal_removal')
|
||||
|
||||
# Note: it is important to insert at least two segments, as currently
|
||||
# control file is synced roughly once in segment range and WAL is not
|
||||
# removed until all horizons are persisted.
|
||||
pg.safe_psql_many([
|
||||
'CREATE TABLE t(key int primary key, value text)',
|
||||
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
|
||||
"INSERT INTO t SELECT generate_series(1,200000), 'payload'",
|
||||
])
|
||||
|
||||
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
|
||||
|
||||
@@ -1488,6 +1488,17 @@ class NeonPageserver(PgProtocol):
|
||||
self.running = True
|
||||
return self
|
||||
|
||||
def _wait_for_death(self):
|
||||
"""Wait for pageserver to die. Assumes kill signal is sent."""
|
||||
pid_path = pathlib.Path(self.env.repo_dir) / "pageserver.pid"
|
||||
pid = read_pid(pid_path)
|
||||
retries_left = 20
|
||||
while check_pid(pid):
|
||||
time.sleep(0.2)
|
||||
retries_left -= 1
|
||||
if retries_left == 0:
|
||||
raise AssertionError("Pageserver failed to die")
|
||||
|
||||
def stop(self, immediate=False) -> 'NeonPageserver':
|
||||
"""
|
||||
Stop the page server.
|
||||
@@ -1495,6 +1506,7 @@ class NeonPageserver(PgProtocol):
|
||||
"""
|
||||
if self.running:
|
||||
self.env.neon_cli.pageserver_stop(immediate)
|
||||
self._wait_for_death()
|
||||
self.running = False
|
||||
return self
|
||||
|
||||
@@ -2004,6 +2016,17 @@ def read_pid(path: Path) -> int:
|
||||
return int(path.read_text())
|
||||
|
||||
|
||||
def check_pid(pid):
|
||||
"""Check whether pid is running."""
|
||||
try:
|
||||
# If sig is 0, then no signal is sent, but error checking is still performed.
|
||||
os.kill(pid, 0)
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
@dataclass
|
||||
class SafekeeperPort:
|
||||
pg: int
|
||||
@@ -2475,3 +2498,9 @@ def wait_for_last_record_lsn(pageserver_http_client: NeonPageserverHttpClient,
|
||||
time.sleep(1)
|
||||
raise Exception("timed out while waiting for last_record_lsn to reach {}, was {}".format(
|
||||
lsn_to_hex(lsn), lsn_to_hex(current_lsn)))
|
||||
|
||||
|
||||
def wait_for_last_flush_lsn(env: NeonEnv, pg: Postgres, tenant: uuid.UUID, timeline: uuid.UUID):
|
||||
"""Wait for pageserver to catch up the latest flush LSN"""
|
||||
last_flush_lsn = lsn_from_hex(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 5280b6fe10...0a9045c9ff
Reference in New Issue
Block a user