Compare commits

..

4 Commits

Author SHA1 Message Date
Konstantin Knizhnik
6b35f3307f Update pageserver/src/tenant/layer_map.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-04-29 22:26:04 +03:00
Konstantin Knizhnik
5f39328a06 Replace suplicate layer in layer map 2023-04-28 14:49:20 +03:00
Konstantin Knizhnik
fa7e975787 Make clippy happy 2023-04-27 16:27:24 +03:00
Konstantin Knizhnik
9b095034bc Do not return Error in case of duplicate layer detetion 2023-04-27 15:40:05 +03:00
60 changed files with 277 additions and 1827 deletions

View File

@@ -50,7 +50,5 @@ storage:
ansible_host: i-027662bd552bf5db0
safekeeper-2.us-east-2.aws.neon.build:
ansible_host: i-0de0b03a51676a6ce
safekeeper-3.us-east-2.aws.neon.build:
ansible_host: i-05f8ba2cda243bd18
safekeeper-99.us-east-2.aws.neon.build:
ansible_host: i-0d61b6a2ea32028d5

View File

@@ -111,21 +111,8 @@ jobs:
- name: Get postgres headers
run: make postgres-headers -j$(nproc)
# cargo hack runs the given cargo subcommand (clippy in this case) for all feature combinations.
# This will catch compiler & clippy warnings in all feature combinations.
# TODO: use cargo hack for build and test as well, but, that's quite expensive.
# NB: keep clippy args in sync with ./run_clippy.sh
- run: |
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
if [ "$CLIPPY_COMMON_ARGS" = "" ]; then
echo "No clippy args found in .neon_clippy_args"
exit 1
fi
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
- name: Run cargo clippy (debug)
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
- name: Run cargo clippy (release)
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
- name: Run cargo clippy
run: ./run_clippy.sh
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
- name: Check formatting

View File

@@ -1,4 +0,0 @@
# * `-A unknown_lints` do not warn about unknown lint suppressions
# that people with newer toolchains might use
# * `-D warnings` - fail on any warnings (`cargo` returns non-zero exit status)
export CLIPPY_COMMON_ARGS="--locked --workspace --all-targets -- -A unknown_lints -D warnings"

11
Cargo.lock generated
View File

@@ -4629,16 +4629,6 @@ dependencies = [
"valuable",
]
[[package]]
name = "tracing-error"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e"
dependencies = [
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
@@ -4889,7 +4879,6 @@ dependencies = [
"thiserror",
"tokio",
"tracing",
"tracing-error",
"tracing-subscriber",
"url",
"uuid",

View File

@@ -110,7 +110,6 @@ toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.2"

View File

@@ -249,63 +249,18 @@ impl ComputeNode {
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
#[derive(Clone)]
enum Replication {
Primary,
Static { lsn: Lsn },
HotStandby,
}
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.pgdata);
let hot_replica = if let Some(option) = spec.cluster.settings.find_ref("hot_standby") {
if let Some(value) = &option.value {
anyhow::ensure!(option.vartype == "bool");
matches!(value.as_str(), "on" | "yes" | "true")
} else {
false
}
} else {
false
};
let replication = if hot_replica {
Replication::HotStandby
} else if let Some(lsn) = spec.cluster.settings.find("recovery_target_lsn") {
Replication::Static {
lsn: Lsn::from_str(&lsn)?,
}
} else {
Replication::Primary
};
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
// is already connected it will be kicked out, so a secondary (standby)
// cannot sync safekeepers.
let lsn = match &replication {
Replication::Primary => {
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
lsn
}
Replication::Static { lsn } => {
info!("Starting read-only node at static LSN {}", lsn);
*lsn
}
Replication::HotStandby => {
info!("Initializing standby from latest Pageserver LSN");
Lsn(0)
}
};
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
info!(
"getting basebackup@{} from pageserver {}",
@@ -321,13 +276,6 @@ impl ComputeNode {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
match &replication {
Replication::Primary | Replication::Static { .. } => {}
Replication::HotStandby => {
add_standby_signal(pgdata_path)?;
}
}
Ok(())
}

View File

@@ -94,7 +94,6 @@ impl PgOptionsSerialize for GenericOptions {
pub trait GenericOptionsSearch {
fn find(&self, name: &str) -> Option<String>;
fn find_ref(&self, name: &str) -> Option<&GenericOption>;
}
impl GenericOptionsSearch for GenericOptions {
@@ -104,12 +103,6 @@ impl GenericOptionsSearch for GenericOptions {
let op = ops.iter().find(|s| s.name == name)?;
op.value.clone()
}
/// Lookup option by name, returning ref
fn find_ref(&self, name: &str) -> Option<&GenericOption> {
let ops = self.as_ref()?;
ops.iter().find(|s| s.name == name)
}
}
pub trait RoleExt {

View File

@@ -1,4 +1,3 @@
use std::fs::File;
use std::path::Path;
use std::str::FromStr;
@@ -146,21 +145,6 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
Ok(())
}
/// Create a standby.signal file
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of spec.json
info!("adding standby.signal");
let signalfile = pgdata_path.join("standby.signal");
if !signalfile.exists() {
info!("created standby.signal");
File::create(signalfile)?;
} else {
info!("reused pre-existing standby.signal");
}
Ok(())
}
/// Given a cluster spec json and open transaction it handles roles creation,
/// deletion and update.
#[instrument(skip_all)]

View File

@@ -8,7 +8,6 @@
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint::Replication;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
@@ -475,14 +474,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(
tenant_id,
name,
timeline_id,
None,
pg_version,
Replication::Primary,
)?;
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
println!("Done");
}
Some(("branch", branch_match)) => {
@@ -568,20 +560,20 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.iter()
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
{
let lsn_str = match endpoint.replication {
Replication::Static(lsn) => {
// -> read-only endpoint
// Use the node's LSN.
lsn.to_string()
}
_ => {
// -> primary endpoint or hot replica
let lsn_str = match endpoint.lsn {
None => {
// -> primary endpoint
// Use the LSN at the end of the timeline.
timeline_infos
.get(&endpoint.timeline_id)
.map(|bi| bi.last_record_lsn.to_string())
.unwrap_or_else(|| "?".to_string())
}
Some(lsn) => {
// -> read-only endpoint
// Use the endpoint's LSN.
lsn.to_string()
}
};
let branch_name = timeline_name_mappings
@@ -627,26 +619,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.copied()
.context("Failed to parse postgres version from the argument string")?;
let hot_standby = sub_args
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
let replication = match (lsn, hot_standby) {
(Some(lsn), false) => Replication::Static(lsn),
(None, true) => Replication::Replica,
(None, false) => Replication::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
cplane.new_endpoint(
tenant_id,
&endpoint_id,
timeline_id,
port,
pg_version,
replication,
)?;
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
@@ -664,21 +637,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
None
};
let hot_standby = sub_args
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
if let Some(endpoint) = endpoint {
match (&endpoint.replication, hot_standby) {
(Replication::Static(_), true) => {
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
}
(Replication::Primary, true) => {
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
}
_ => {}
}
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token)?;
} else {
@@ -700,14 +659,6 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<u32>("pg-version")
.copied()
.context("Failed to `pg-version` from the argument string")?;
let replication = match (lsn, hot_standby) {
(Some(lsn), false) => Replication::Static(lsn),
(None, true) => Replication::Replica,
(None, false) => Replication::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
// when used with custom port this results in non obvious behaviour
// port is remembered from first start command, i e
// start --port X
@@ -719,9 +670,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
tenant_id,
endpoint_id,
timeline_id,
lsn,
port,
pg_version,
replication,
)?;
ep.start(&auth_token)?;
}
@@ -977,12 +928,6 @@ fn cli() -> Command {
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
.required(false);
let hot_standby_arg = Arg::new("hot-standby")
.value_parser(value_parser!(bool))
.long("hot-standby")
.help("If set, the node will be a hot replica on the specified timeline")
.required(false);
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1107,7 +1052,6 @@ fn cli() -> Command {
.long("config-only")
.required(false))
.arg(pg_version_arg.clone())
.arg(hot_standby_arg.clone())
)
.subcommand(Command::new("start")
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
@@ -1118,7 +1062,6 @@ fn cli() -> Command {
.arg(lsn_arg)
.arg(port_arg)
.arg(pg_version_arg)
.arg(hot_standby_arg)
)
.subcommand(
Command::new("stop")

View File

@@ -68,19 +68,18 @@ impl ComputeControlPlane {
tenant_id: TenantId,
name: &str,
timeline_id: TimelineId,
lsn: Option<Lsn>,
port: Option<u16>,
pg_version: u32,
replication: Replication,
) -> Result<Arc<Endpoint>> {
let port = port.unwrap_or_else(|| self.get_port());
let ep = Arc::new(Endpoint {
name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
env: self.env.clone(),
pageserver: Arc::clone(&self.pageserver),
timeline_id,
replication,
lsn,
tenant_id,
pg_version,
});
@@ -96,18 +95,6 @@ impl ComputeControlPlane {
///////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Replication {
// Regular read-write node
Primary,
// if recovery_target_lsn is provided, and we want to pin the node to a specific LSN
Static(Lsn),
// Hot standby; read-only replica.
// Future versions may want to distinguish between replicas with hot standby
// feedback and other kinds of replication configurations.
Replica,
}
#[derive(Debug)]
pub struct Endpoint {
/// used as the directory name
@@ -115,7 +102,7 @@ pub struct Endpoint {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
pub replication: Replication,
pub lsn: Option<Lsn>,
// port and address of the Postgres server
pub address: SocketAddr,
@@ -166,17 +153,9 @@ impl Endpoint {
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn and primary_conninfo into Recovery Target, if any
let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") {
Replication::Static(Lsn::from_str(lsn_str)?)
} else if let Some(slot_name) = conf.get("primary_slot_name") {
let slot_name = slot_name.to_string();
let prefix = format!("repl_{}_", timeline_id);
assert!(slot_name.starts_with(&prefix));
Replication::Replica
} else {
Replication::Primary
};
// parse recovery_target_lsn, if any
let recovery_target_lsn: Option<Lsn> =
conf.parse_field_optional("recovery_target_lsn", &context)?;
// ok now
Ok(Endpoint {
@@ -185,7 +164,7 @@ impl Endpoint {
env: env.clone(),
pageserver: Arc::clone(pageserver),
timeline_id,
replication,
lsn: recovery_target_lsn,
tenant_id,
pg_version,
})
@@ -320,83 +299,50 @@ impl Endpoint {
conf.append("neon.pageserver_connstring", &pageserver_connstr);
conf.append("neon.tenant_id", &self.tenant_id.to_string());
conf.append("neon.timeline_id", &self.timeline_id.to_string());
if let Some(lsn) = self.lsn {
conf.append("recovery_target_lsn", &lsn.to_string());
}
conf.append_line("");
// Replication-related configurations, such as WAL sending
match &self.replication {
Replication::Primary => {
// Configure backpressure
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
// updates pages are not requested from pageserver.
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
// To be able to restore database in case of pageserver node crash, safekeeper should not
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
// (if they are not able to upload WAL to S3).
conf.append("max_replication_write_lag", "15MB");
conf.append("max_replication_flush_lag", "10GB");
// Configure backpressure
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
// updates pages are not requested from pageserver.
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
// To be able to restore database in case of pageserver node crash, safekeeper should not
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
// (if they are not able to upload WAL to S3).
conf.append("max_replication_write_lag", "15MB");
conf.append("max_replication_flush_lag", "10GB");
if !self.env.safekeepers.is_empty() {
// Configure Postgres to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
if !self.env.safekeepers.is_empty() {
// Configure Postgres to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
let safekeepers = self
.env
.safekeepers
.iter()
.map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>()
.join(",");
conf.append("neon.safekeepers", &safekeepers);
} else {
// We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver,
// so set more relaxed synchronous_commit.
conf.append("synchronous_commit", "remote_write");
let safekeepers = self
.env
.safekeepers
.iter()
.map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>()
.join(",");
conf.append("neon.safekeepers", &safekeepers);
} else {
// We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver,
// so set more relaxed synchronous_commit.
conf.append("synchronous_commit", "remote_write");
// Configure the node to stream WAL directly to the pageserver
// This isn't really a supported configuration, but can be useful for
// testing.
conf.append("synchronous_standby_names", "pageserver");
}
}
Replication::Static(lsn) => {
conf.append("recovery_target_lsn", &lsn.to_string());
}
Replication::Replica => {
assert!(!self.env.safekeepers.is_empty());
// TODO: use future host field from safekeeper spec
// Pass the list of safekeepers to the replica so that it can connect to any of them,
// whichever is availiable.
let sk_ports = self
.env
.safekeepers
.iter()
.map(|x| x.pg_port.to_string())
.collect::<Vec<_>>()
.join(",");
let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
let connstr = format!(
"host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
sk_hosts,
sk_ports,
&self.timeline_id.to_string(),
&self.tenant_id.to_string(),
);
let slot_name = format!("repl_{}_", self.timeline_id);
conf.append("primary_conninfo", connstr.as_str());
conf.append("primary_slot_name", slot_name.as_str());
conf.append("hot_standby", "on");
}
// Configure the node to stream WAL directly to the pageserver
// This isn't really a supported configuration, but can be useful for
// testing.
conf.append("synchronous_standby_names", "pageserver");
}
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
@@ -409,27 +355,21 @@ impl Endpoint {
}
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
let backup_lsn = match &self.replication {
Replication::Primary => {
if !self.env.safekeepers.is_empty() {
// LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) {
None
} else {
Some(lsn)
}
} else {
None
}
}
Replication::Static(lsn) => Some(*lsn),
Replication::Replica => {
None // Take the latest snapshot available to start with
let backup_lsn = if let Some(lsn) = self.lsn {
Some(lsn)
} else if !self.env.safekeepers.is_empty() {
// LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) {
None
} else {
Some(lsn)
}
} else {
None
};
self.do_basebackup(backup_lsn)?;
@@ -526,7 +466,7 @@ impl Endpoint {
// 3. Load basebackup
self.load_basebackup(auth_token)?;
if self.replication != Replication::Primary {
if self.lsn.is_some() {
File::create(self.pgdata().join("standby.signal"))?;
}

View File

@@ -13,7 +13,7 @@ use std::io::BufRead;
use std::str::FromStr;
/// In-memory representation of a postgresql.conf file
#[derive(Default, Debug)]
#[derive(Default)]
pub struct PostgresConf {
lines: Vec<String>,
hash: HashMap<String, String>,

View File

@@ -28,6 +28,11 @@
"value": "replica",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "wal_log_hints",
"value": "on",

View File

@@ -95,13 +95,10 @@ pub fn generate_wal_segment(
segno: u64,
system_id: u64,
pg_version: u32,
lsn: Lsn,
) -> Result<Bytes, SerializeError> {
assert_eq!(segno, lsn.segment_number(WAL_SEGMENT_SIZE));
match pg_version {
14 => v14::xlog_utils::generate_wal_segment(segno, system_id, lsn),
15 => v15::xlog_utils::generate_wal_segment(segno, system_id, lsn),
14 => v14::xlog_utils::generate_wal_segment(segno, system_id),
15 => v15::xlog_utils::generate_wal_segment(segno, system_id),
_ => Err(SerializeError::BadInput),
}
}

View File

@@ -195,7 +195,6 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLP_LONG_HEADER: u16 = 0x0002;
/* From fsm_internals.h */

View File

@@ -270,11 +270,6 @@ impl XLogPageHeaderData {
use utils::bin_ser::LeSer;
XLogPageHeaderData::des_from(&mut buf.reader())
}
pub fn encode(&self) -> Result<Bytes, SerializeError> {
use utils::bin_ser::LeSer;
self.ser().map(|b| b.into())
}
}
impl XLogLongPageHeaderData {
@@ -333,32 +328,22 @@ impl CheckPoint {
}
}
/// Generate new, empty WAL segment, with correct block headers at the first
/// page of the segment and the page that contains the given LSN.
/// We need this segment to start compute node.
pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
//
// Generate new, empty WAL segment.
// We need this segment to start compute node.
//
pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, SerializeError> {
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
let page_off = lsn.block_offset();
let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
let first_page_only = seg_off < XLOG_BLCKSZ;
let (shdr_rem_len, infoflags) = if first_page_only {
(seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
} else {
(0, 0)
};
let hdr = XLogLongPageHeaderData {
std: {
XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
xlp_info: pg_constants::XLP_LONG_HEADER,
xlp_tli: PG_TLI,
xlp_pageaddr: pageaddr,
xlp_rem_len: shdr_rem_len as u32,
xlp_rem_len: 0,
..Default::default() // Put 0 in padding fields.
}
},
@@ -372,33 +357,6 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Byte
//zero out the rest of the file
seg_buf.resize(WAL_SEGMENT_SIZE, 0);
if !first_page_only {
let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
let header = XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
pg_constants::XLP_FIRST_IS_CONTRECORD
} else {
0
},
xlp_tli: PG_TLI,
xlp_pageaddr: lsn.page_lsn().0,
xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
page_off as u32
} else {
0u32
},
..Default::default() // Put 0 in padding fields.
};
let hdr_bytes = header.encode()?;
debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
debug_assert_ne!(block_offset, 0);
seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
}
Ok(seg_buf.freeze())
}

View File

@@ -99,11 +99,7 @@ struct S3WithTestBlobs {
#[async_trait::async_trait]
impl AsyncTestContext for MaybeEnabledS3 {
async fn setup() -> Self {
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
)
.expect("logging init failed");
utils::logging::init(utils::logging::LogFormat::Test).expect("logging init failed");
if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
info!(
"`{}` env variable is not set, skipping the test",

View File

@@ -27,8 +27,7 @@ signal-hook.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-error.workspace = true
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
tracing-subscriber = { workspace = true, features = ["json"] }
rand.workspace = true
serde_with.workspace = true
strum.workspace = true

View File

@@ -54,8 +54,6 @@ pub mod measured_stream;
pub mod serde_percent;
pub mod serde_regex;
pub mod tracing_span_assert;
/// use with fail::cfg("$name", "return(2000)")
#[macro_export]
macro_rules! failpoint_sleep_millis_async {

View File

@@ -56,20 +56,7 @@ where
}
}
/// Whether to add the `tracing_error` crate's `ErrorLayer`
/// to the global tracing subscriber.
///
pub enum TracingErrorLayerEnablement {
/// Do not add the `ErrorLayer`.
Disabled,
/// Add the `ErrorLayer` with the filter specified by RUST_LOG, defaulting to `info` if `RUST_LOG` is unset.
EnableWithRustLogFilter,
}
pub fn init(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
) -> anyhow::Result<()> {
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
@@ -80,26 +67,21 @@ pub fn init(
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
let r = tracing_subscriber::registry();
let r = r.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
LogFormat::Test => log_layer.with_test_writer().boxed(),
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
match tracing_error_layer_enablement {
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
.init(),
TracingErrorLayerEnablement::Disabled => r.init(),
}
tracing_subscriber::registry()
.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
LogFormat::Test => log_layer.with_test_writer().boxed(),
};
log_layer.with_filter(rust_log_env_filter())
})
.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()))
.init();
Ok(())
}

View File

@@ -62,48 +62,29 @@ impl Lsn {
}
/// Compute the offset into a segment
#[inline]
pub fn segment_offset(self, seg_sz: usize) -> usize {
(self.0 % seg_sz as u64) as usize
}
/// Compute LSN of the segment start.
#[inline]
pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
Lsn(self.0 - (self.0 % seg_sz as u64))
}
/// Compute the segment number
#[inline]
pub fn segment_number(self, seg_sz: usize) -> u64 {
self.0 / seg_sz as u64
}
/// Compute the offset into a block
#[inline]
pub fn block_offset(self) -> u64 {
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
self.0 % BLCKSZ
}
/// Compute the block offset of the first byte of this Lsn within this
/// segment
#[inline]
pub fn page_lsn(self) -> Lsn {
Lsn(self.0 - self.block_offset())
}
/// Compute the block offset of the first byte of this Lsn within this
/// segment
#[inline]
pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
(self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
}
/// Compute the bytes remaining in this block
///
/// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
#[inline]
pub fn remaining_in_block(self) -> u64 {
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
BLCKSZ - (self.0 % BLCKSZ)

View File

@@ -1,287 +0,0 @@
//! Assert that the current [`tracing::Span`] has a given set of fields.
//!
//! # Usage
//!
//! ```
//! use tracing_subscriber::prelude::*;
//! let registry = tracing_subscriber::registry()
//! .with(tracing_error::ErrorLayer::default());
//!
//! // Register the registry as the global subscriber.
//! // In this example, we'll only use it as a thread-local subscriber.
//! let _guard = tracing::subscriber::set_default(registry);
//!
//! // Then, in the main code:
//!
//! let span = tracing::info_span!("TestSpan", test_id = 1);
//! let _guard = span.enter();
//!
//! // ... down the call stack
//!
//! use utils::tracing_span_assert::{check_fields_present, MultiNameExtractor};
//! let extractor = MultiNameExtractor::new("TestExtractor", ["test", "test_id"]);
//! match check_fields_present([&extractor]) {
//! Ok(()) => {},
//! Err(missing) => {
//! panic!("Missing fields: {:?}", missing.into_iter().map(|f| f.name() ).collect::<Vec<_>>());
//! }
//! }
//! ```
//!
//! Recommended reading: https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
//!
use std::{
collections::HashSet,
fmt::{self},
hash::{Hash, Hasher},
};
pub enum ExtractionResult {
Present,
Absent,
}
pub trait Extractor: Send + Sync + std::fmt::Debug {
fn name(&self) -> &str;
fn extract(&self, fields: &tracing::field::FieldSet) -> ExtractionResult;
}
#[derive(Debug)]
pub struct MultiNameExtractor<const L: usize> {
name: &'static str,
field_names: [&'static str; L],
}
impl<const L: usize> MultiNameExtractor<L> {
pub fn new(name: &'static str, field_names: [&'static str; L]) -> MultiNameExtractor<L> {
MultiNameExtractor { name, field_names }
}
}
impl<const L: usize> Extractor for MultiNameExtractor<L> {
fn name(&self) -> &str {
self.name
}
fn extract(&self, fields: &tracing::field::FieldSet) -> ExtractionResult {
if fields.iter().any(|f| self.field_names.contains(&f.name())) {
ExtractionResult::Present
} else {
ExtractionResult::Absent
}
}
}
struct MemoryIdentity<'a>(&'a dyn Extractor);
impl<'a> MemoryIdentity<'a> {
fn as_ptr(&self) -> *const () {
self.0 as *const _ as *const ()
}
}
impl<'a> PartialEq for MemoryIdentity<'a> {
fn eq(&self, other: &Self) -> bool {
self.as_ptr() == other.as_ptr()
}
}
impl<'a> Eq for MemoryIdentity<'a> {}
impl<'a> Hash for MemoryIdentity<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.as_ptr().hash(state);
}
}
impl<'a> fmt::Debug for MemoryIdentity<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:p}: {}", self.as_ptr(), self.0.name())
}
}
/// The extractor names passed as keys to [`new`].
pub fn check_fields_present<const L: usize>(
must_be_present: [&dyn Extractor; L],
) -> Result<(), Vec<&dyn Extractor>> {
let mut missing: HashSet<MemoryIdentity> =
HashSet::from_iter(must_be_present.into_iter().map(|r| MemoryIdentity(r)));
let trace = tracing_error::SpanTrace::capture();
trace.with_spans(|md, _formatted_fields| {
missing.retain(|extractor| match extractor.0.extract(md.fields()) {
ExtractionResult::Present => false,
ExtractionResult::Absent => true,
});
!missing.is_empty() // continue walking up until we've found all missing
});
if missing.is_empty() {
Ok(())
} else {
Err(missing.into_iter().map(|mi| mi.0).collect())
}
}
#[cfg(test)]
mod tests {
use tracing_subscriber::prelude::*;
use super::*;
struct Setup {
_current_thread_subscriber_guard: tracing::subscriber::DefaultGuard,
tenant_extractor: MultiNameExtractor<2>,
timeline_extractor: MultiNameExtractor<2>,
}
fn setup_current_thread() -> Setup {
let tenant_extractor = MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"]);
let timeline_extractor = MultiNameExtractor::new("TimelineId", ["timeline_id", "timeline"]);
let registry = tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(tracing_error::ErrorLayer::default());
let guard = tracing::subscriber::set_default(registry);
Setup {
_current_thread_subscriber_guard: guard,
tenant_extractor,
timeline_extractor,
}
}
fn assert_missing(missing: Vec<&dyn Extractor>, expected: Vec<&dyn Extractor>) {
let missing: HashSet<MemoryIdentity> =
HashSet::from_iter(missing.into_iter().map(MemoryIdentity));
let expected: HashSet<MemoryIdentity> =
HashSet::from_iter(expected.into_iter().map(MemoryIdentity));
assert_eq!(missing, expected);
}
#[test]
fn positive_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", tenant_id = "tenant-1", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap();
}
#[test]
fn negative_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", timeline_id = "timeline-1");
let _guard = span.enter();
let missing =
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn positive_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", tenant_id = "tenant-1");
let _guard = span.enter();
let span = tracing::info_span!("grandchild", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap();
}
#[test]
fn negative_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn positive_subset_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", tenant_id = "tenant-1", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor]).unwrap();
}
#[test]
fn positive_subset_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", tenant_id = "tenant-1");
let _guard = span.enter();
let span = tracing::info_span!("grandchild", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor]).unwrap();
}
#[test]
fn negative_subset_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn negative_subset_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn tracing_error_subscriber_not_set_up() {
// no setup
let span = tracing::info_span!("foo", e = "some value");
let _guard = span.enter();
let extractor = MultiNameExtractor::new("E", ["e"]);
let missing = check_fields_present([&extractor]).unwrap_err();
assert_missing(missing, vec![&extractor]);
}
#[test]
#[should_panic]
fn panics_if_tracing_error_subscriber_has_wrong_filter() {
let r = tracing_subscriber::registry().with({
tracing_error::ErrorLayer::default().with_filter(
tracing_subscriber::filter::dynamic_filter_fn(|md, _| {
if md.is_span() && *md.level() == tracing::Level::INFO {
return false;
}
true
}),
)
});
let _guard = tracing::subscriber::set_default(r);
let span = tracing::info_span!("foo", e = "some value");
let _guard = span.enter();
let extractor = MultiNameExtractor::new("E", ["e"]);
let missing = check_fields_present([&extractor]).unwrap_err();
assert_missing(missing, vec![&extractor]);
}
}

View File

@@ -463,13 +463,9 @@ where
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
let wal_seg = postgres_ffi::generate_wal_segment(
segno,
system_identifier,
self.timeline.pg_version,
self.lsn,
)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
let wal_seg =
postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
self.ar.append(&header, &wal_seg[..]).await?;
Ok(())

View File

@@ -25,7 +25,6 @@ use pageserver::{
virtual_file,
};
use postgres_backend::AuthType;
use utils::logging::TracingErrorLayerEnablement;
use utils::signals::ShutdownSignals;
use utils::{
auth::JwtAuth, logging, project_git_version, sentry_init::init_sentry, signals::Signal,
@@ -87,19 +86,8 @@ fn main() -> anyhow::Result<()> {
}
};
// Initialize logging.
//
// It must be initialized before the custom panic hook is installed below.
//
// Regarding tracing_error enablement: at this time, we only use the
// tracing_error crate to debug_assert that log spans contain tenant and timeline ids.
// See `debug_assert_current_span_has_tenant_and_timeline_id` in the timeline module
let tracing_error_layer_enablement = if cfg!(debug_assertions) {
TracingErrorLayerEnablement::EnableWithRustLogFilter
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(conf.log_format, tracing_error_layer_enablement)?;
// Initialize logging, which must be initialized before the custom panic hook is installed.
logging::init(conf.log_format)?;
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.
@@ -238,7 +226,6 @@ fn start_pageserver(
);
set_build_info_metric(GIT_VERSION);
set_launch_timestamp_metric(launch_ts);
pageserver::preinitialize_metrics();
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes

View File

@@ -114,7 +114,7 @@ async fn import_rel(
path: &Path,
spcoid: Oid,
dboid: Oid,
reader: &mut (impl AsyncRead + Unpin),
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
len: usize,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -200,7 +200,7 @@ async fn import_slru(
modification: &mut DatadirModification<'_>,
slru: SlruKind,
path: &Path,
reader: &mut (impl AsyncRead + Unpin),
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
len: usize,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -612,8 +612,8 @@ async fn import_file(
Ok(None)
}
async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
async fn read_all_bytes(reader: &mut (impl AsyncRead + Send + Sync + Unpin)) -> Result<Bytes> {
let mut buf: Vec<u8> = vec![];
reader.read_to_end(&mut buf).await?;
Ok(Bytes::from(buf))
Ok(Bytes::copy_from_slice(&buf[..]))
}

View File

@@ -44,8 +44,6 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
pub async fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.

View File

@@ -205,15 +205,6 @@ static EVICTIONS_WITH_LOW_RESIDENCE_DURATION: Lazy<IntCounterVec> = Lazy::new(||
.expect("failed to define a metric")
});
pub static UNEXPECTED_ONDEMAND_DOWNLOADS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_unexpected_ondemand_downloads_count",
"Number of unexpected on-demand downloads. \
We log more context for each increment, so, forgo any labels in this metric.",
)
.expect("failed to define a metric")
});
/// Each [`Timeline`]'s [`EVICTIONS_WITH_LOW_RESIDENCE_DURATION`] metric.
#[derive(Debug)]
pub struct EvictionsWithLowResidenceDuration {
@@ -1141,10 +1132,3 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
poll_result
}
}
pub fn preinitialize_metrics() {
// We want to alert on this metric increasing.
// Initialize it eagerly, so that our alert rule can distinguish absence of the metric from metric value 0.
assert_eq!(UNEXPECTED_ONDEMAND_DOWNLOADS.get(), 0);
UNEXPECTED_ONDEMAND_DOWNLOADS.reset();
}

View File

@@ -20,6 +20,7 @@ use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use postgres_backend::PostgresBackendTCP;
use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
use pq_proto::framed::ConnectionError;
use pq_proto::FeStartupPacket;
@@ -31,7 +32,6 @@ use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
use tracing::*;
use utils::id::ConnectionId;
@@ -57,10 +57,7 @@ use crate::trace::Tracer;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
where
IO: AsyncRead + AsyncWrite + Unpin,
{
fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<Bytes>> + '_ {
async_stream::try_stream! {
loop {
let msg = tokio::select! {
@@ -68,8 +65,8 @@ where
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = "pageserver is shutting down";
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
let msg = "pageserver is shutting down".to_string();
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}
@@ -128,7 +125,7 @@ where
///
/// XXX: Currently, any trailing data after the EOF marker prints a warning.
/// Perhaps it should be a hard error?
async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
let mut buf = [0u8; 512];
@@ -248,14 +245,12 @@ async fn page_service_conn_main(
.set_nodelay(true)
.context("could not set TCP_NODELAY")?;
let peer_addr = socket.peer_addr().context("get peer address")?;
// XXX: pgbackend.run() should take the connection_ctx,
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(conf, auth, connection_ctx);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
let pgbackend = PostgresBackend::new(socket, auth_type, None)?;
match pgbackend
.run(&mut conn_handler, task_mgr::shutdown_watcher)
@@ -337,16 +332,13 @@ impl PageServerHandler {
}
#[instrument(skip(self, pgb, ctx))]
async fn handle_pagerequests<IO>(
async fn handle_pagerequests(
&self,
pgb: &mut PostgresBackend<IO>,
pgb: &mut PostgresBackendTCP,
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: RequestContext,
) -> anyhow::Result<()>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
) -> anyhow::Result<()> {
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
@@ -444,19 +436,16 @@ impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, pgb, ctx))]
async fn handle_import_basebackup<IO>(
async fn handle_import_basebackup(
&self,
pgb: &mut PostgresBackend<IO>,
pgb: &mut PostgresBackendTCP,
tenant_id: TenantId,
timeline_id: TimelineId,
base_lsn: Lsn,
_end_lsn: Lsn,
pg_version: u32,
ctx: RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
) -> Result<(), QueryError> {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
@@ -497,18 +486,15 @@ impl PageServerHandler {
}
#[instrument(skip(self, pgb, ctx))]
async fn handle_import_wal<IO>(
async fn handle_import_wal(
&self,
pgb: &mut PostgresBackend<IO>,
pgb: &mut PostgresBackendTCP,
tenant_id: TenantId,
timeline_id: TimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
ctx: RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
) -> Result<(), QueryError> {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
@@ -704,19 +690,16 @@ impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, pgb, ctx))]
async fn handle_basebackup_request<IO>(
async fn handle_basebackup_request(
&mut self,
pgb: &mut PostgresBackend<IO>,
pgb: &mut PostgresBackendTCP,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
ctx: RequestContext,
) -> anyhow::Result<()>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
) -> anyhow::Result<()> {
let started = std::time::Instant::now();
// check that the timeline exists
@@ -787,13 +770,10 @@ impl PageServerHandler {
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
impl postgres_backend::Handler<tokio::net::TcpStream> for PageServerHandler {
fn check_auth_jwt(
&mut self,
_pgb: &mut PostgresBackend<IO>,
_pgb: &mut PostgresBackendTCP,
jwt_response: &[u8],
) -> Result<(), QueryError> {
// this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
@@ -821,7 +801,7 @@ where
fn startup(
&mut self,
_pgb: &mut PostgresBackend<IO>,
_pgb: &mut PostgresBackendTCP,
_sm: &FeStartupPacket,
) -> Result<(), QueryError> {
Ok(())
@@ -829,7 +809,7 @@ where
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
pgb: &mut PostgresBackendTCP,
query_string: &str,
) -> Result<(), QueryError> {
let ctx = self.connection_ctx.attached_child();

View File

@@ -2886,13 +2886,7 @@ pub mod harness {
};
LOG_HANDLE.get_or_init(|| {
logging::init(
logging::LogFormat::Test,
// enable it in case in case the tests exercise code paths that use
// debug_assert_current_span_has_tenant_and_timeline_id
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
)
.expect("Failed to init test logging")
logging::init(logging::LogFormat::Test).expect("Failed to init test logging")
});
let repo_dir = PageServerConf::test_repo_dir(test_name);

View File

@@ -55,6 +55,7 @@ use anyhow::{bail, Result};
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
use tracing::*;
use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
@@ -275,18 +276,24 @@ where
///
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
let key = historic_layer_coverage::LayerKey::from(&*layer);
if self.historic.contains(&key) {
bail!(
"Attempt to insert duplicate layer {} in layer map",
layer.short_id()
);
}
self.historic.insert(key, Arc::clone(&layer));
match self.historic.replace(&key, Arc::clone(&layer), |existing| {
!Self::compare_arced_layers(existing, &layer)
}) {
Replacement::Replaced { .. } => {
if Self::is_l0(&layer) {
bail!("Duplicate L0 layer {}", layer.short_id());
}
warn!("Replace duplicate layer {} in layer map", layer.short_id());
}
Replacement::Unexpected(_) => bail!("Replace layer with itself is prohibited"),
Replacement::NotFound | Replacement::RemovalBuffered => {
self.historic.insert(key, Arc::clone(&layer));
if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer);
if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer);
}
}
}
Ok(())
}

View File

@@ -417,14 +417,6 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
}
}
pub fn contains(&self, layer_key: &LayerKey) -> bool {
match self.buffer.get(layer_key) {
Some(None) => false, // layer remove was buffered
Some(_) => true, // layer insert was buffered
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
}
}
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
self.buffer.insert(layer_key, Some(value));
}

View File

@@ -16,7 +16,6 @@ use tracing::{info, warn};
use crate::config::PageServerConf;
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
use remote_storage::{DownloadError, GenericRemoteStorage};
use utils::crashsafe::path_with_suffix_extension;
@@ -44,8 +43,6 @@ pub async fn download_layer_file<'a>(
layer_file_name: &'a LayerFileName,
layer_metadata: &'a LayerFileMetadata,
) -> Result<u64, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
let local_path = timeline_path.join(layer_file_name.file_name());
@@ -157,7 +154,7 @@ pub async fn download_layer_file<'a>(
.with_context(|| format!("Could not fsync layer file {}", local_path.display(),))
.map_err(DownloadError::Other)?;
tracing::debug!("download complete: {}", local_path.display());
tracing::info!("download complete: {}", local_path.display());
Ok(bytes_amount)
}

View File

@@ -4,7 +4,6 @@ pub mod delta_layer;
mod filename;
mod image_layer;
mod inmemory_layer;
pub(crate) mod layer_contents;
mod remote_layer;
use crate::config::PageServerConf;

View File

@@ -30,9 +30,6 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::layer_contents::virtual_value::{
VirtualValue, VirtualValueBuilder,
};
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -282,12 +279,12 @@ impl Layer for DeltaLayer {
// A subroutine to dump a single blob
let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
let buf = cursor.read_blob(blob_ref.pos())?;
let val = VirtualValue::des(&buf)?;
let val = Value::des(&buf)?;
let desc = match val {
VirtualValue::NaturalImage(img) => {
Value::Image(img) => {
format!(" img {} bytes", img.len())
}
VirtualValue::NaturalWalRecord(rec) => {
Value::WalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
@@ -296,31 +293,6 @@ impl Layer for DeltaLayer {
wal_desc
)
}
VirtualValue::ClosedLineage { image, lsns, .. } => {
format!(
" lin(closed,img) {} bytes max_lsn {} {} bytes tail {} recs",
buf.len(),
lsns.last().unwrap(),
image.len(),
lsns.len(),
)
}
VirtualValue::ClosedRecLineage { lsns, .. } => {
format!(
" lin(closed,rec) {} bytes max_lsn {} {} recs",
buf.len(),
lsns.last().unwrap(),
lsns.len() + 1,
)
}
VirtualValue::OpenLineage { lsns, .. } => {
format!(
" lin(open) {} bytes max_lsn {} {} recs",
buf.len(),
lsns.last().unwrap(),
lsns.len(),
)
}
};
Ok(desc)
};
@@ -378,12 +350,10 @@ impl Layer for DeltaLayer {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
offsets.push((entry_lsn, blob_ref.pos()));
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})?;
@@ -398,94 +368,28 @@ impl Layer for DeltaLayer {
file.file.path.display()
)
})?;
let val = VirtualValue::des(&buf).with_context(|| {
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
VirtualValue::NaturalImage(img) => {
if lsn_range.contains(&entry_lsn) {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
}
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
VirtualValue::NaturalWalRecord(rec) => {
if lsn_range.contains(&entry_lsn) {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
VirtualValue::ClosedLineage {
image,
lsns,
records,
} => {
assert_eq!(lsns.len(), records.len());
for (lsn, rec) in
Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev())
{
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
if lsn_range.contains(&entry_lsn) {
reconstruct_state.img = Some((entry_lsn, image));
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
} => {
assert_eq!(lsns.len(), records.len());
for (lsn, rec) in
Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev())
{
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
if lsn_range.contains(&entry_lsn) {
reconstruct_state.records.push((entry_lsn, image_rec));
need_image = false;
break;
}
}
VirtualValue::OpenLineage {
mut lsns,
mut records,
} => {
while let Some(lsn) = lsns.pop() {
let rec = records.pop().unwrap();
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
assert_eq!(records.len(), 1);
if lsn_range.contains(&entry_lsn) {
reconstruct_state
.records
.push((entry_lsn, records.pop().unwrap()));
}
}
};
}
}
// release metadata lock and close the file
}
@@ -778,8 +682,6 @@ struct DeltaLayerWriterInner {
timeline_id: TimelineId,
tenant_id: TenantId,
vvbuilder: Option<(Key, VirtualValueBuilder)>,
key_start: Key,
lsn_range: Range<Lsn>,
@@ -822,7 +724,6 @@ impl DeltaLayerWriterInner {
path,
timeline_id,
tenant_id,
vvbuilder: None,
key_start,
lsn_range,
tree: tree_builder,
@@ -835,42 +736,8 @@ impl DeltaLayerWriterInner {
///
/// The values must be appended in key, lsn order.
///
fn put_value(&mut self, new_key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
assert!(new_key >= self.key_start);
assert!(self.lsn_range.contains(&lsn));
match self.vvbuilder.take() {
None => {
let mut builder = VirtualValueBuilder::new();
let res = builder.push(lsn, val);
assert!(res.is_none());
self.vvbuilder = Some((new_key, builder));
}
Some((key, mut builder)) => {
if key != new_key {
let (lsn, vvalue) = builder.finish().unwrap();
self.put_value_bytes(
key,
lsn,
&VirtualValue::ser(&vvalue)?,
vvalue.will_init(),
)?;
builder = VirtualValueBuilder::new();
}
if let Some((old_lsn, vvalue)) = builder.push(lsn, val) {
self.put_value_bytes(
new_key,
old_lsn,
&VirtualValue::ser(&vvalue)?,
vvalue.will_init(),
)?;
}
self.vvbuilder = Some((new_key, builder));
}
}
Ok(())
fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
}
fn put_value_bytes(
@@ -899,14 +766,7 @@ impl DeltaLayerWriterInner {
///
/// Finish writing the delta layer.
///
fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
// first, flush the last key's vvalue to disk.
if let Some((key, builder)) = self.vvbuilder.take() {
if let Some((lsn, vvalue)) = builder.finish() {
self.put_value_bytes(key, lsn, &VirtualValue::ser(&vvalue)?, vvalue.will_init())?;
};
}
fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -1077,7 +937,7 @@ impl Drop for DeltaLayerWriter {
}
///
/// Iterator over all key-value pairs stored in a delta layer
/// Iterator over all key-value pairse stored in a delta layer
///
/// FIXME: This creates a Vector to hold the offsets of all key value pairs.
/// That takes up quite a lot of memory. Should do this in a more streaming
@@ -1087,7 +947,6 @@ struct DeltaValueIter<'a> {
all_offsets: Vec<(DeltaKey, BlobRef)>,
next_idx: usize,
reader: BlockCursor<Adapter<'a>>,
decode_queue: Option<<Vec<(Lsn, Value)> as IntoIterator>::IntoIter>,
}
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
@@ -1131,21 +990,12 @@ impl<'a> DeltaValueIter<'a> {
all_offsets,
next_idx: 0,
reader: BlockCursor::new(Adapter(inner)),
decode_queue: None,
};
Ok(iter)
}
fn next_res(&mut self) -> Result<Option<(Key, Lsn, Value)>> {
if let Some(data) = &mut self.decode_queue {
let res = data.next();
if let Some((lsn, value)) = res {
let key = self.all_offsets[self.next_idx - 1].0.key();
return Ok(Some((key, lsn, value)));
}
}
if self.next_idx < self.all_offsets.len() {
let (delta_key, blob_ref) = &self.all_offsets[self.next_idx];
@@ -1153,11 +1003,7 @@ impl<'a> DeltaValueIter<'a> {
let lsn = delta_key.lsn();
let buf = self.reader.read_blob(blob_ref.pos())?;
let val_vec = VirtualValue::des(&buf)?.into_value_vec(lsn);
let mut iter = val_vec.into_iter();
let Some((lsn, val)) = iter.next() else { bail!("missing data in VirtualValue") };
self.decode_queue = Some(iter);
let val = Value::des(&buf)?;
self.next_idx += 1;
Ok(Some((key, lsn, val)))
} else {

View File

@@ -1 +0,0 @@
pub(crate) mod virtual_value;

View File

@@ -1,237 +0,0 @@
use crate::repository::Value;
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
/// VirtualValue is stored in some Layers instead of the normal repository::Value.
///
/// It describes one or more Values that are associated to one
/// repostiory::Key, containing zero or one base image with all WAL records
/// that need to apply to this preceding page image. In balanced tree-based
/// indexes this reduces the number of full Keys we need to store, thus
/// reducing the size of the layer's index and increasing cache efficiency.
///
/// Additionally, the abstraction paves the way to implement compression in the
/// layer file themselves, as we'd just need to add a new variant to the
/// VirtualValue type for compressed types. Examples of such optimizations
/// are bitpacked and delta-encoded LSNs in the Lineage variants of this enum.
///
/// NOTE: Once committed into a hosted branch, these variants _must_ remain
/// in this order, and cannot be removed - they are part of the specification
/// of the physical layout of the DeltaLayer file. Any reordering is going to
/// change the meaning of bytes in existing files and break the compatibility
/// with old layers; so make sure you don't reorder these, nor should you
/// update the layout of existing variants. You can update new variants as long
/// as no user data is written using these variants.
///
/// NOTE: The first two variants are cloned over from repository::Value, which
/// was the definition of the stored data in DeltaLayer before VirtualValue.
/// These variants have the same layout and index, so they should (de)serialize
/// into the same binary format, guaranteeing backwards compatibility.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum VirtualValue {
/// NaturalImage: A natural WAL image, picked from PostgreSQL WAL.
NaturalImage(Bytes),
/// NaturalWalRecord: A natural WAL record, picked from PostgreSQL WAL.
NaturalWalRecord(NeonWalRecord),
/// ClosedLineage: A page image, followed by a set of WAL records that are
/// applied to that page image.
ClosedLineage {
image: Bytes,
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
/// ClosedRecLineage: A will-init WAL record, followed by a set of WAL
/// records that are applied to the page image of the WAL record.
ClosedRecLineage {
image_rec: NeonWalRecord,
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
/// OpenLineage: A set of WAL records that are applied to the same page,
/// but that do not have a known page image in this Layer.
OpenLineage {
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
}
impl VirtualValue {
pub(crate) fn into_value_vec(self, lsn: Lsn) -> Vec<(Lsn, Value)> {
match self {
VirtualValue::NaturalImage(img) => vec![(lsn, Value::Image(img))],
VirtualValue::NaturalWalRecord(rec) => vec![(lsn, Value::WalRecord(rec))],
VirtualValue::ClosedLineage {
image,
lsns,
records,
} => {
let mut res = Vec::with_capacity(lsns.len() + 1);
res.push((lsn, Value::Image(image)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
} => {
let mut res = Vec::with_capacity(lsns.len() + 1);
res.push((lsn, Value::WalRecord(image_rec)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
VirtualValue::OpenLineage { lsns, mut records } => {
let mut res = Vec::with_capacity(lsns.len() + 1);
let first_record = records.remove(0);
res.push((lsn, Value::WalRecord(first_record)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
}
}
pub(crate) fn will_init(&self) -> bool {
match self {
VirtualValue::NaturalImage(_) => true,
VirtualValue::NaturalWalRecord(rec) => rec.will_init(),
VirtualValue::ClosedLineage { .. } => true,
VirtualValue::ClosedRecLineage { .. } => true,
VirtualValue::OpenLineage { .. } => false,
}
}
}
impl From<Value> for VirtualValue {
fn from(value: Value) -> Self {
match value {
Value::Image(img) => VirtualValue::NaturalImage(img),
Value::WalRecord(rec) => VirtualValue::NaturalWalRecord(rec),
}
}
}
#[must_use = "deconstruct the value using ::finish to make sure you don't lose intermediate values"]
pub struct VirtualValueBuilder {
state: Option<(Lsn, VirtualValue)>,
}
impl VirtualValueBuilder {
pub fn new() -> Self {
Self { state: None }
}
#[must_use = "intermediate emitted values should be stored"]
pub fn push(&mut self, new_lsn: Lsn, value: Value) -> Option<(Lsn, VirtualValue)> {
if let Some((lsn, _)) = &self.state {
assert!(new_lsn > *lsn);
}
match value {
Value::Image(img) => {
let res = self.state.take();
self.state = Some((new_lsn, VirtualValue::NaturalImage(img)));
res
}
Value::WalRecord(new_rec) => {
if new_rec.will_init() {
let res = self.state.take();
self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec)));
return res;
}
match self.state.take() {
None => {
self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec)));
None
}
Some((start_lsn, virtual_value)) => {
let new_vv = match virtual_value {
VirtualValue::NaturalImage(img) => VirtualValue::ClosedLineage {
image: img,
lsns: vec![new_lsn],
records: vec![new_rec],
},
VirtualValue::NaturalWalRecord(vv_start) => {
if vv_start.will_init() {
VirtualValue::ClosedRecLineage {
image_rec: vv_start,
lsns: vec![new_lsn],
records: vec![new_rec],
}
} else {
VirtualValue::OpenLineage {
lsns: vec![new_lsn],
records: vec![vv_start, new_rec],
}
}
}
VirtualValue::ClosedLineage {
image,
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::ClosedLineage {
image,
lsns,
records,
}
}
VirtualValue::ClosedRecLineage {
image_rec,
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
}
}
VirtualValue::OpenLineage {
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::OpenLineage { lsns, records }
}
};
self.state = Some((start_lsn, new_vv));
None
}
}
}
}
}
#[must_use]
pub fn finish(mut self) -> Option<(Lsn, VirtualValue)> {
self.state.take()
}
}

View File

@@ -48,7 +48,7 @@ use crate::tenant::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
use crate::metrics::TimelineMetrics;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
@@ -936,7 +936,6 @@ impl Timeline {
}
}
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
@@ -2356,7 +2355,6 @@ impl Timeline {
id,
ctx.task_kind()
);
UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
timeline.download_remote_layer(remote_layer).await?;
continue 'layer_map_search;
}
@@ -3302,6 +3300,10 @@ impl Timeline {
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
fail_point!("compact-level0-phase1-finish", |_| {
Err(anyhow::anyhow!("failpoint compact-level0-phase1-finish").into())
});
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
@@ -3820,13 +3822,11 @@ impl Timeline {
/// If the caller has a deadline or needs a timeout, they can simply stop polling:
/// we're **cancellation-safe** because the download happens in a separate task_mgr task.
/// So, the current download attempt will run to completion even if we stop polling.
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%remote_layer.short_id()))]
pub async fn download_remote_layer(
&self,
remote_layer: Arc<RemoteLayer>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
use std::sync::atomic::Ordering::Relaxed;
let permit = match Arc::clone(&remote_layer.ongoing_download)
@@ -3870,8 +3870,6 @@ impl Timeline {
.await;
if let Ok(size) = &result {
info!("layer file download finished");
// XXX the temp file is still around in Err() case
// and consumes space until we clean up upon pageserver restart.
self_clone.metrics.resident_physical_size_gauge.add(*size);
@@ -3943,8 +3941,6 @@ impl Timeline {
updates.flush();
drop(layers);
info!("on-demand download successful");
// Now that we've inserted the download into the layer map,
// close the semaphore. This will make other waiters for
// this download return Ok(()).
@@ -3952,7 +3948,7 @@ impl Timeline {
remote_layer.ongoing_download.close();
} else {
// Keep semaphore open. We'll drop the permit at the end of the function.
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
error!("on-demand download failed: {:?}", result.as_ref().unwrap_err());
}
// Don't treat it as an error if the task that triggered the download
@@ -4263,36 +4259,3 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
bail!("couldn't find an unused backup number for {:?}", path)
}
#[cfg(not(debug_assertions))]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {}
#[cfg(debug_assertions)]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
use utils::tracing_span_assert;
pub static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<
tracing_span_assert::MultiNameExtractor<2>,
> = once_cell::sync::Lazy::new(|| {
tracing_span_assert::MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"])
});
pub static TIMELINE_ID_EXTRACTOR: once_cell::sync::Lazy<
tracing_span_assert::MultiNameExtractor<2>,
> = once_cell::sync::Lazy::new(|| {
tracing_span_assert::MultiNameExtractor::new("TimelineId", ["timeline_id", "timeline"])
});
match tracing_span_assert::check_fields_present([
&*TENANT_ID_EXTRACTOR,
&*TIMELINE_ID_EXTRACTOR,
]) {
Ok(()) => (),
Err(missing) => panic!(
"missing extractors: {:?}",
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
),
}
}

View File

@@ -348,7 +348,7 @@ impl ConnectionManagerState {
.context("walreceiver connection handling failure")
}
.instrument(
info_span!("walreceiver_connection", tenant_id = %id.tenant_id, timeline_id = %id.timeline_id, node_id = %new_sk.safekeeper_id),
info_span!("walreceiver_connection", id = %id, node_id = %new_sk.safekeeper_id),
)
});

View File

@@ -370,74 +370,6 @@ lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
return found;
}
/*
* Evict a page (if present) from the local file cache
*/
void
lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
{
BufferTag tag;
FileCacheEntry* entry;
ssize_t rc;
bool found;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return;
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
if (!found)
{
/* nothing to do */
LWLockRelease(lfc_lock);
return;
}
/* remove the page from the cache */
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
/*
* If the chunk has no live entries, we can position the chunk to be
* recycled first.
*/
if (entry->bitmap[chunk_offs >> 5] == 0)
{
bool has_remaining_pages;
for (int i = 0; i < (BLOCKS_PER_CHUNK / 32); i++) {
if (entry->bitmap[i] != 0)
{
has_remaining_pages = true;
break;
}
}
/*
* Put the entry at the position that is first to be reclaimed when
* we have no cached pages remaining in the chunk
*/
if (!has_remaining_pages)
{
dlist_delete(&entry->lru_node);
dlist_push_head(&lfc_ctl->lru, &entry->lru_node);
}
}
/*
* Done: apart from empty chunks, we don't move chunks in the LRU when
* they're empty because eviction isn't usage.
*/
LWLockRelease(lfc_lock);
}
/*
* Try to read page from local cache.
* Returns true if page is found in local cache.
@@ -596,6 +528,7 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
LWLockRelease(lfc_lock);
}
/*
* Record structure holding the to be exposed cache data.
*/

View File

@@ -17,8 +17,6 @@
#include "pagestore_client.h"
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "storage/buf_internals.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
@@ -59,8 +57,6 @@ int n_unflushed_requests = 0;
int flush_every_n_requests = 8;
int readahead_buffer_size = 128;
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static void pageserver_flush(void);
static bool
@@ -471,8 +467,6 @@ pg_init_libpagestore(void)
smgr_hook = smgr_neon;
smgr_init_hook = smgr_init_neon;
dbsize_hook = neon_dbsize;
old_redo_read_buffer_filter = redo_read_buffer_filter;
redo_read_buffer_filter = neon_redo_read_buffer_filter;
}
lfc_init();
}

View File

@@ -24,7 +24,6 @@
#include "neon.h"
#include "walproposer.h"
#include "pagestore_client.h"
PG_MODULE_MAGIC;
void _PG_init(void);

View File

@@ -11,7 +11,6 @@
#ifndef NEON_H
#define NEON_H
#include "access/xlogreader.h"
/* GUCs */
extern char *neon_auth_token;
@@ -21,11 +20,4 @@ extern char *neon_tenant;
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
/*
* Returns true if we shouldn't do REDO on that block in record indicated by
* block_id; false otherwise.
*/
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
#endif /* NEON_H */

View File

@@ -207,7 +207,6 @@ extern void forget_cached_relsize(RelFileNode rnode, ForkNumber forknum);
extern void lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
extern bool lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
extern bool lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);

View File

@@ -189,7 +189,6 @@ typedef struct PrfHashEntry {
#define SH_DEFINE
#define SH_DECLARE
#include "lib/simplehash.h"
#include "neon.h"
/*
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
@@ -1210,9 +1209,6 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
if (ShutdownRequestPending)
return;
/* Don't log any pages if we're not allowed to do so. */
if (!XLogInsertAllowed())
return;
/*
* Whenever a VM or FSM page is evicted, WAL-log it. FSM and (some) VM
@@ -1379,18 +1375,8 @@ neon_get_request_lsn(bool *latest, RelFileNode rnode, ForkNumber forknum, BlockN
if (RecoveryInProgress())
{
/*
* We don't know if WAL has been generated but not yet replayed, so
* we're conservative in our estimates about latest pages.
*/
*latest = false;
/*
* Get the last written LSN of this page.
*/
lsn = GetLastWrittenLSN(rnode, forknum, blkno);
lsn = nm_adjust_lsn(lsn);
lsn = GetXLogReplayRecPtr(NULL);
elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
(uint32) ((lsn) >> 32), (uint32) (lsn));
}
@@ -1573,15 +1559,6 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
/*
* Newly created relation is empty, remember that in the relsize cache.
*
* Note that in REDO, this is called to make sure the relation fork exists,
* but it does not truncate the relation. So, we can only update the
* relsize if it didn't exist before.
*
* Also, in redo, we must make sure to update the cached size of the
* relation, as that is the primary source of truth for REDO's
* file length considerations, and as file extension isn't (perfectly)
* logged, we need to take care of that before we hit file size checks.
*
* FIXME: This is currently not just an optimization, but required for
* correctness. Postgres can call smgrnblocks() on the newly-created
* relation. Currently, we don't call SetLastWrittenLSN() when a new
@@ -1589,14 +1566,7 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
* cache, we might call smgrnblocks() on the newly-created relation before
* the creation WAL record hass been received by the page server.
*/
if (isRedo)
{
update_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
get_cached_relsize(reln->smgr_rnode.node, forkNum,
&reln->smgr_cached_nblocks[forkNum]);
}
else
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1861,26 +1831,6 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
.blockNum = blkno,
};
/*
* The redo process does not lock pages that it needs to replay but are
* not in the shared buffers, so a concurrent process may request the
* page after redo has decided it won't redo that page and updated the
* LwLSN for that page.
* If we're in hot standby we need to take care that we don't return
* until after REDO has finished replaying up to that LwLSN, as the page
* should have been locked up to that point.
*
* See also the description on neon_redo_read_buffer_filter below.
*
* NOTE: It is possible that the WAL redo process will still do IO due to
* concurrent failed read IOs. Those IOs should never have a request_lsn
* that is as large as the WAL record we're currently replaying, if it
* weren't for the behaviour of the LwLsn cache that uses the highest
* value of the LwLsn cache when the entry is not found.
*/
if (RecoveryInProgress() && !(MyBackendType == B_STARTUP))
XLogWaitForReplayOf(request_lsn);
/*
* Try to find prefetched page in the list of received pages.
*/
@@ -2634,143 +2584,3 @@ smgr_init_neon(void)
smgr_init_standard();
neon_init();
}
/*
* Return whether we can skip the redo for this block.
*
* The conditions for skipping the IO are:
*
* - The block is not in the shared buffers, and
* - The block is not in the local file cache
*
* ... because any subsequent read of the page requires us to read
* the new version of the page from the PageServer. We do not
* check the local file cache; we instead evict the page from LFC: it
* is cheaper than going through the FS calls to read the page, and
* limits the number of lock operations used in the REDO process.
*
* We have one exception to the rules for skipping IO: We always apply
* changes to shared catalogs' pages. Although this is mostly out of caution,
* catalog updates usually result in backends rebuilding their catalog snapshot,
* which means it's quite likely the modified page is going to be used soon.
*
* It is important to note that skipping WAL redo for a page also means
* the page isn't locked by the redo process, as there is no Buffer
* being returned, nor is there a buffer descriptor to lock.
* This means that any IO that wants to read this block needs to wait
* for the WAL REDO process to finish processing the WAL record before
* it allows the system to start reading the block, as releasing the
* block early could lead to phantom reads.
*
* For example, REDO for a WAL record that modifies 3 blocks could skip
* the first block, wait for a lock on the second, and then modify the
* third block. Without skipping, all blocks would be locked and phantom
* reads would not occur, but with skipping, a concurrent process could
* read block 1 with post-REDO contents and read block 3 with pre-REDO
* contents, where with REDO locking it would wait on block 1 and see
* block 3 with post-REDO contents only.
*/
bool
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
{
XLogRecPtr end_recptr = record->EndRecPtr;
XLogRecPtr prev_end_recptr = record->ReadRecPtr - 1;
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blkno;
BufferTag tag;
uint32 hash;
LWLock *partitionLock;
Buffer buffer;
bool no_redo_needed;
BlockNumber relsize;
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
return true;
#if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
elog(PANIC, "failed to locate backup block with ID %d", block_id);
#else
XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno);
#endif
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers.
* See also this function's top comment.
*/
if (!OidIsValid(rnode.dbNode))
return false;
INIT_BUFFERTAG(tag, rnode, forknum, blkno);
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);
/*
* Lock the partition of shared_buffers so that it can't be updated
* concurrently.
*/
LWLockAcquire(partitionLock, LW_SHARED);
/* Try to find the relevant buffer */
buffer = BufTableLookup(&tag, hash);
no_redo_needed = buffer < 0;
/* we don't have the buffer in memory, update lwLsn past this record */
if (no_redo_needed)
{
SetLastWrittenLSNForBlock(end_recptr, rnode, forknum, blkno);
lfc_evict(rnode, forknum, blkno);
}
else
{
SetLastWrittenLSNForBlock(prev_end_recptr, rnode, forknum, blkno);
}
LWLockRelease(partitionLock);
/* Extend the relation if we know its size */
if (get_cached_relsize(rnode, forknum, &relsize))
{
if (relsize < blkno + 1)
update_cached_relsize(rnode, forknum, blkno + 1);
}
else
{
/*
* Size was not cached. We populate the cache now, with the size of the
* relation measured after this WAL record is applied.
*
* This length is later reused when we open the smgr to read the block,
* which is fine and expected.
*/
NeonResponse *response;
NeonNblocksResponse *nbresponse;
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.tag = T_NeonNblocksRequest,
},
.rnode = rnode,
.forknum = forknum,
};
response = page_server_request(&request);
Assert(response->tag == T_NeonNblocksResponse);
nbresponse = (NeonNblocksResponse *) response;
Assert(nbresponse->n_blocks > blkno);
set_cached_relsize(rnode, forknum, nbresponse->n_blocks);
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
}
return no_redo_needed;
}

View File

@@ -1964,26 +1964,18 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback * hs)
{
if (safekeeper[i].appendResponse.hs.ts != 0)
{
HotStandbyFeedback *skhs = &safekeeper[i].appendResponse.hs;
if (FullTransactionIdIsNormal(skhs->xmin)
&& FullTransactionIdPrecedes(skhs->xmin, hs->xmin))
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.xmin, hs->xmin))
{
hs->xmin = skhs->xmin;
hs->ts = skhs->ts;
hs->xmin = safekeeper[i].appendResponse.hs.xmin;
hs->ts = safekeeper[i].appendResponse.hs.ts;
}
if (FullTransactionIdIsNormal(skhs->catalog_xmin)
&& FullTransactionIdPrecedes(skhs->catalog_xmin, hs->xmin))
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.catalog_xmin, hs->catalog_xmin))
{
hs->catalog_xmin = skhs->catalog_xmin;
hs->ts = skhs->ts;
hs->catalog_xmin = safekeeper[i].appendResponse.hs.catalog_xmin;
hs->ts = safekeeper[i].appendResponse.hs.ts;
}
}
}
if (hs->xmin.value == ~0)
hs->xmin = InvalidFullTransactionId;
if (hs->catalog_xmin.value == ~0)
hs->catalog_xmin = InvalidFullTransactionId;
}
/*

View File

@@ -1,8 +1,8 @@
use crate::{cancellation::CancelClosure, error::UserFacingError};
use futures::{FutureExt, TryFutureExt};
use futures::TryFutureExt;
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::{io, net::SocketAddr, time::Duration};
use std::{io, net::SocketAddr};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::NoTls;
@@ -130,23 +130,9 @@ impl ConnCfg {
async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream)> {
use tokio_postgres::config::Host;
// wrap TcpStream::connect with timeout
let connect_with_timeout = |host, port| {
let connection_timeout = Duration::from_millis(10000);
tokio::time::timeout(connection_timeout, TcpStream::connect((host, port))).map(
move |res| match res {
Ok(tcpstream_connect_res) => tcpstream_connect_res,
Err(_) => Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("exceeded connection timeout {connection_timeout:?}"),
)),
},
)
};
let connect_once = |host, port| {
info!("trying to connect to compute node at {host}:{port}");
connect_with_timeout(host, port).and_then(|socket| async {
TcpStream::connect((host, port)).and_then(|socket| async {
let socket_addr = socket.peer_addr()?;
// This prevents load balancer from severing the connection.
socket2::SockRef::from(&socket).set_keepalive(true)?;
@@ -179,6 +165,7 @@ impl ConnCfg {
Host::Unix(_) => continue, // unix sockets are not welcome here
};
// TODO: maybe we should add a timeout.
match connect_once(host, *port).await {
Ok(socket) => return Ok(socket),
Err(err) => {

View File

@@ -95,7 +95,7 @@ fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
current_metrics.push((
Ids {
endpoint_id: endpoint_id.to_string(),
branch_id: branch_id.to_string(),
branch_id: "".to_string(),
},
(value, Utc::now()),
));

View File

@@ -95,9 +95,9 @@ pub async fn task_main(
handle_client(config, &cancel_map, session_id, socket).await
}
.unwrap_or_else(move |e| {
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!(?session_id, "per-client task finished with an error: {e:#}");
error!("per-client task finished with an error: {e:#}");
}),
);
}

View File

@@ -1,5 +1,4 @@
#!/usr/bin/env bash
set -euo pipefail
#!/bin/bash
# If you save this in your path under the name "cargo-zclippy" (or whatever
# name you like), then you can run it as "cargo zclippy" from the shell prompt.
@@ -9,11 +8,7 @@ set -euo pipefail
# warnings and errors right in the editor.
# In vscode, this setting is Rust-analyzer>Check On Save:Command
# NB: the CI runs the full feature powerset, so, it catches slightly more errors
# at the expense of longer runtime. This script is used by developers, so, don't
# do that here.
thisscript="${BASH_SOURCE[0]}"
thisscript_dir="$(dirname "$thisscript")"
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
exec cargo clippy --all-features $CLIPPY_COMMON_ARGS
# * `-A unknown_lints` do not warn about unknown lint suppressions
# that people with newer toolchains might use
# * `-D warnings` - fail on any warnings (`cargo` returns non-zero exit status)
cargo clippy --locked --all --all-targets --all-features -- -A unknown_lints -D warnings

View File

@@ -134,10 +134,7 @@ fn main() -> anyhow::Result<()> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
)?;
logging::init(LogFormat::from_config(&args.log_format)?)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
info!("version: {GIT_VERSION}");

View File

@@ -3,7 +3,6 @@
use anyhow::Context;
use std::str;
use std::str::FromStr;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span, Instrument};
@@ -50,14 +49,12 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
if cmd.starts_with("START_WAL_PUSH") {
Ok(SafekeeperPostgresCommand::StartWalPush)
} else if cmd.starts_with("START_REPLICATION") {
let re = Regex::new(
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)",
)
.unwrap();
let re =
Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
let mut caps = re.captures_iter(cmd);
let start_lsn = caps
.next()
.map(|cap| Lsn::from_str(&cap[1]))
.map(|cap| cap[1].parse::<Lsn>())
.context("parse start LSN from START_REPLICATION command")??;
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn })
} else if cmd.starts_with("IDENTIFY_SYSTEM") {

View File

@@ -18,7 +18,6 @@ use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogF
use postgres_ffi::{XLogSegNo, PG_TLI};
use std::cmp::{max, min};
use bytes::Bytes;
use std::fs::{self, remove_file, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
@@ -37,7 +36,6 @@ use postgres_ffi::XLOG_BLCKSZ;
use postgres_ffi::waldecoder::WalStreamDecoder;
use pq_proto::SystemId;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub trait Storage {
@@ -480,13 +478,6 @@ pub struct WalReader {
// We don't have WAL locally if LSN is less than local_start_lsn
local_start_lsn: Lsn,
// We will respond with zero-ed bytes before this Lsn as long as
// pos is in the same segment as timeline_start_lsn.
timeline_start_lsn: Lsn,
// integer version number of PostgreSQL, e.g. 14; 15; 16
pg_version: u32,
system_id: SystemId,
timeline_start_segment: Option<Bytes>,
}
impl WalReader {
@@ -497,27 +488,19 @@ impl WalReader {
start_pos: Lsn,
enable_remote_read: bool,
) -> Result<Self> {
if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
bail!("state uninitialized, no data to read");
}
// TODO: Upgrade to bail!() once we know this couldn't possibly happen
if state.timeline_start_lsn == Lsn(0) {
warn!("timeline_start_lsn uninitialized before initializing wal reader");
}
if start_pos
< state
.timeline_start_lsn
.segment_lsn(state.server.wal_seg_size as usize)
{
if start_pos < state.timeline_start_lsn {
bail!(
"Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
"Requested streaming from {}, which is before the start of the timeline {}",
start_pos,
state.timeline_start_lsn
);
}
// TODO: add state.timeline_start_lsn == Lsn(0) check
if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
bail!("state uninitialized, no data to read");
}
Ok(Self {
workdir,
timeline_dir,
@@ -526,65 +509,10 @@ impl WalReader {
wal_segment: None,
enable_remote_read,
local_start_lsn: state.local_start_lsn,
timeline_start_lsn: state.timeline_start_lsn,
pg_version: state.server.pg_version / 10000,
system_id: state.server.system_id,
timeline_start_segment: None,
})
}
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
// If this timeline is new, we may not have a full segment yet, so
// we pad the first bytes of the timeline's first WAL segment with 0s
if self.pos < self.timeline_start_lsn {
debug_assert_eq!(
self.pos.segment_number(self.wal_seg_size),
self.timeline_start_lsn.segment_number(self.wal_seg_size)
);
// All bytes after timeline_start_lsn are in WAL, but those before
// are not, so we manually construct an empty segment for the bytes
// not available in this timeline.
if self.timeline_start_segment.is_none() {
let it = postgres_ffi::generate_wal_segment(
self.timeline_start_lsn.segment_number(self.wal_seg_size),
self.system_id,
self.pg_version,
self.timeline_start_lsn,
)?;
self.timeline_start_segment = Some(it);
}
assert!(self.timeline_start_segment.is_some());
let segment = self.timeline_start_segment.take().unwrap();
let seg_bytes = &segment[..];
// How much of the current segment have we already consumed?
let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
// How many bytes may we consume in total?
let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
debug_assert!(seg_bytes.len() > pos_seg_offset);
debug_assert!(seg_bytes.len() > tl_start_seg_offset);
// Copy as many bytes as possible into the buffer
let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
self.pos += len as u64;
// If we're done with the segment, we can release it's memory.
// However, if we're not yet done, store it so that we don't have to
// construct the segment the next time this function is called.
if self.pos < self.timeline_start_lsn {
self.timeline_start_segment = Some(segment);
}
return Ok(len);
}
let mut wal_segment = match self.wal_segment.take() {
Some(reader) => reader,
None => self.open_segment().await?,

View File

@@ -42,16 +42,12 @@ def main(args: argparse.Namespace):
res: DefaultDict[str, DefaultDict[str, Dict[str, bool]]]
res = defaultdict(lambda: defaultdict(dict))
try:
logging.info("connecting to the database...")
with psycopg2.connect(connstr, connect_timeout=30) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
logging.info("fetching flaky tests...")
cur.execute(FLAKY_TESTS_QUERY, (interval_days,))
rows = cur.fetchall()
except psycopg2.OperationalError as exc:
logging.error("cannot fetch flaky tests from the DB due to an error", exc)
rows = []
logging.info("connecting to the database...")
with psycopg2.connect(connstr, connect_timeout=10) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
logging.info("fetching flaky tests...")
cur.execute(FLAKY_TESTS_QUERY, (interval_days,))
rows = cur.fetchall()
for row in rows:
logging.info(f"\t{row['parent_suite'].replace('.', '/')}/{row['suite']}.py::{row['test']}")

View File

@@ -430,10 +430,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
)?;
logging::init(LogFormat::from_config(&args.log_format)?)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);

View File

@@ -53,7 +53,6 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"pageserver_storage_operations_seconds_global_count",
"pageserver_storage_operations_seconds_global_sum",
"pageserver_storage_operations_seconds_global_bucket",
"pageserver_unexpected_ondemand_downloads_count_total",
"libmetrics_launch_timestamp",
"libmetrics_build_info",
"libmetrics_tracing_event_count_total",

View File

@@ -1451,7 +1451,6 @@ class NeonCli(AbstractNeonCli):
branch_name: str,
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
port: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
@@ -1471,8 +1470,6 @@ class NeonCli(AbstractNeonCli):
args.extend(["--port", str(port)])
if endpoint_id is not None:
args.append(endpoint_id)
if hot_standby:
args.extend(["--hot-standby", "true"])
res = self.raw_cli(args)
res.check_returncode()
@@ -2209,7 +2206,6 @@ class Endpoint(PgProtocol):
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
self.env = env
self.running = False
self.branch_name: Optional[str] = None # dubious
self.endpoint_id: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
@@ -2221,7 +2217,6 @@ class Endpoint(PgProtocol):
self,
branch_name: str,
endpoint_id: Optional[str] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> "Endpoint":
@@ -2236,14 +2231,12 @@ class Endpoint(PgProtocol):
if endpoint_id is None:
endpoint_id = self.env.generate_endpoint_id()
self.endpoint_id = endpoint_id
self.branch_name = branch_name
self.env.neon_cli.endpoint_create(
branch_name,
endpoint_id=self.endpoint_id,
tenant_id=self.tenant_id,
lsn=lsn,
hot_standby=hot_standby,
port=self.port,
)
path = Path("endpoints") / self.endpoint_id / "pgdata"
@@ -2368,7 +2361,6 @@ class Endpoint(PgProtocol):
self,
branch_name: str,
endpoint_id: Optional[str] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> "Endpoint":
@@ -2383,7 +2375,6 @@ class Endpoint(PgProtocol):
branch_name=branch_name,
endpoint_id=endpoint_id,
config_lines=config_lines,
hot_standby=hot_standby,
lsn=lsn,
).start()
@@ -2417,7 +2408,6 @@ class EndpointFactory:
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
) -> Endpoint:
ep = Endpoint(
@@ -2431,7 +2421,6 @@ class EndpointFactory:
return ep.create_start(
branch_name=branch_name,
endpoint_id=endpoint_id,
hot_standby=hot_standby,
config_lines=config_lines,
lsn=lsn,
)
@@ -2442,7 +2431,6 @@ class EndpointFactory:
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
) -> Endpoint:
ep = Endpoint(
@@ -2461,7 +2449,6 @@ class EndpointFactory:
branch_name=branch_name,
endpoint_id=endpoint_id,
lsn=lsn,
hot_standby=hot_standby,
config_lines=config_lines,
)
@@ -2471,36 +2458,6 @@ class EndpointFactory:
return self
def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None
return self.create(
branch_name=branch_name,
endpoint_id=endpoint_id,
tenant_id=origin.tenant_id,
lsn=None,
hot_standby=True,
config_lines=config_lines,
)
def new_replica_start(
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None
return self.create_start(
branch_name=branch_name,
endpoint_id=endpoint_id,
tenant_id=origin.tenant_id,
lsn=None,
hot_standby=True,
config_lines=config_lines,
)
@dataclass
class SafekeeperPort:

View File

@@ -59,6 +59,11 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"value": "replica",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": """

View File

@@ -0,0 +1,42 @@
import time
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
# Test duplicate layer detection
#
# This test sets fail point at the end of first compaction phase:
# after flushing new L1 layers but before deletion of L0 layes
# It should cause generation of duplicate L1 layer by compaction after restart
@pytest.mark.timeout(600)
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
# These warnings are expected, when the pageserver is restarted abruptly
env.pageserver.allowed_errors.append(".*found future image layer.*")
env.pageserver.allowed_errors.append(".*found future delta layer.*")
env.pageserver.allowed_errors.append(".*duplicate layer.*")
pageserver_http = env.pageserver.http_client()
# Use aggressive compaction and checkpoint settings
tenant_id, _ = env.neon_cli.create_tenant(
conf={
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
"compaction_period": "1 s",
"compaction_threshold": "3",
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
connstr = endpoint.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
pageserver_http.configure_failpoints(("compact-level0-phase1-finish", "exit"))
with pytest.raises(Exception):
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])
env.pageserver.stop()
env.pageserver.start()
time.sleep(10) # let compaction to be performed

View File

@@ -1,79 +0,0 @@
import pytest
from fixtures.neon_fixtures import NeonEnv
@pytest.mark.timeout(1800)
def test_hot_standby(neon_simple_env: NeonEnv):
env = neon_simple_env
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
primary_lsn = None
cought_up = False
queries = [
"SHOW neon.timeline_id",
"SHOW neon.tenant_id",
"SELECT relname FROM pg_class WHERE relnamespace = current_schema()::regnamespace::oid",
"SELECT COUNT(*), SUM(i) FROM test",
]
responses = dict()
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
p_con.commit()
with p_con.cursor() as p_cur:
p_cur.execute("SELECT pg_current_wal_insert_lsn()::text")
res = p_cur.fetchone()
assert res is not None
(lsn,) = res
primary_lsn = lsn
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
# Note that this may generate more WAL if the transaction has changed
# things, but we don't care about that.
p_con.commit()
for query in queries:
with p_con.cursor() as p_cur:
p_cur.execute(query)
res = p_cur.fetchone()
assert res is not None
response = res
responses[query] = response
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
res = s_cur.fetchone()
assert res is not None
while not cought_up:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
res = secondary_cursor.fetchone()
assert res is not None
(secondary_lsn,) = res
# There may be more changes on the primary after we got our LSN
# due to e.g. autovacuum, but that shouldn't impact the content
# of the tables, so we check whether we've replayed up to at
# least after the commit of the `test` table.
cought_up = secondary_lsn >= primary_lsn
# Explicit commit to flush any transient transaction-level state.
s_con.commit()
for query in queries:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute(query)
response = secondary_cursor.fetchone()
assert response is not None
assert response == responses[query]