Compare commits

...

14 Commits

Author SHA1 Message Date
Matthias van de Meent
ad829e3a76 PR comment updates:
- Add docs on VirtualValue, variants.
- Use #[must_use] instead of `impl Drop` hack.
2023-04-27 18:39:53 +02:00
Matthias
5fd337a674 Improved storage of records in the delta layer btree
Before, we stored each WAL record separately in the main index. With this
change, the main btree only contains:

 - References to Images, and WAL records that apply on top of them
 - References to sequences of WAL records that apply to the same page, but
   don't have an image in this Layer

This reduces the size of the index by some amount, and thus increases the cache-ability of that index.

Unless we're not looking for the latest version of a page or otherwise limit
the lookup window, this change does not significantly impact IO requirements
for normal workloads, as we don't (yet) add any compression to this WAL.
2023-04-27 18:39:53 +02:00
Alexander Bayandin
c4e1cafb63 scripts/flaky_tests.py: handle connection error (#4096)
- Increase `connect_timeout` to 30s, which should be enough for 
most of the cases
- If the script cannot connect to the DB (or any other
`psycopg2.OperationalError` occur) — do not fail the script, log
the error and proceed. Problems with fetching flaky tests shouldn't
block the PR
2023-04-27 17:08:00 +01:00
Joonas Koivunen
fdf5e4db5e refactor: Cleanup page service (#4097)
Refactoring part of #4093.

Numerious `Send + Sync` bounds were a distraction, that were not needed
at all. The proper `Bytes` usage and one `"error_message".to_string()`
are just drive-by fixes.

Not using the `PostgresBackendTCP` allows us to start setting read
timeouts (and more). `PostgresBackendTCP` is still used from proxy, so
it cannot be removed.
2023-04-27 18:51:57 +03:00
Heikki Linnakangas
d1e86d65dc Run rustfmt to fix whitespace.
Commit e6ec2400fc introduced some trivial whitespace issues.
2023-04-27 18:45:22 +03:00
Arseny Sher
f5b4697c90 Log session_id when proxy per client task errors out. 2023-04-27 19:08:22 +04:00
Christian Schwarz
3be81dd36b fix clippy --release failure introduced in #4030 (#4095)
PR `build: run clippy for powerset of features (#4077)` brought us a
`clippy --release` pass.

It was merged after #4030, which fails under `clippy --release` with

```
error: static `TENANT_ID_EXTRACTOR` is never used
    --> pageserver/src/tenant/timeline.rs:4270:16
     |
4270 |     pub static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<
     |                ^^^^^^^^^^^^^^^^^^^
     |
     = note: `-D dead-code` implied by `-D warnings`

error: static `TIMELINE_ID_EXTRACTOR` is never used
    --> pageserver/src/tenant/timeline.rs:4276:16
     |
4276 |     pub static TIMELINE_ID_EXTRACTOR: once_cell::sync::Lazy<
     |                ^^^^^^^^^^^^^^^^^^^^^
```

A merge queue would have prevented this.
2023-04-27 17:07:25 +03:00
MMeent
e6ec2400fc Enable hot standby PostgreSQL replicas.
Notes:
 - This still needs UI support from the Console
 - I've not tuned any GUCs for PostgreSQL to make this work better
 - Safekeeper has gotten a tweak in which WAL is sent and how: It now
sends zero-ed WAL data from the start of the timeline's first segment up to
the first byte of the timeline to be compatible with normal PostgreSQL
WAL streaming.
 - This includes the commits of #3714 

Fixes one part of https://github.com/neondatabase/neon/issues/769

Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
2023-04-27 15:26:44 +02:00
Christian Schwarz
5b911e1f9f build: run clippy for powerset of features (#4077)
This will catch compiler & clippy warnings in all feature combinations.

We should probably use cargo hack for build and test as well, but,
that's quite expensive and would add to overall CI wait times.

obsoletes https://github.com/neondatabase/neon/pull/4073
refs https://github.com/neondatabase/neon/pull/4070
2023-04-27 15:01:27 +03:00
Christian Schwarz
9ea7b5dd38 clean up logging around on-demand downloads (#4030)
- Remove repeated tenant & timeline from span
- Demote logging of the path to debug level
- Log completion at info level, in the same function where we log errors
- distinguish between layer file download success & on-demand download
succeeding as a whole in the log message wording
- Assert that the span contains a tenant id and a timeline id

fixes https://github.com/neondatabase/neon/issues/3945

Before:

```
  INFO compaction_loop{tenant_id=$TENANT_ID}:compact_timeline{timeline=$TIMELINE_ID}:download_remote_layer{tenant_id=$TENANT_ID timeline_id=$TIMELINE_ID layer=000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91}: download complete: /storage/pageserver/data/tenants/$TENANT_ID/timelines/$TIMELINE_ID/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91
  INFO compaction_loop{tenant_id=$TENANT_ID}:compact_timeline{timeline=$TIMELINE_ID}:download_remote_layer{tenant_id=$TENANT_ID timeline_id=$TIMELINE_ID layer=000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91}: Rebuilt layer map. Did 9 insertions to process a batch of 1 updates.
```

After:

```
  INFO compaction_loop{tenant_id=$TENANT_ID}:compact_timeline{timeline=$TIMELINE_ID}:download_remote_layer{layer=000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91}: layer file download finished
  INFO compaction_loop{tenant_id=$TENANT_ID}:compact_timeline{timeline=$TIMELINE_ID}:download_remote_layer{layer=000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91}: Rebuilt layer map. Did 9 insertions to process a batch of 1 updates.
  INFO compaction_loop{tenant_id=$TENANT_ID}:compact_timeline{timeline=$TIMELINE_ID}:download_remote_layer{layer=000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91}: on-demand download successful
```
2023-04-27 11:54:48 +02:00
Arseny Sher
0112a602e1 Add timeout on proxy -> compute connection establishment.
Otherwise we sit up to default tcp_syn_retries (about 2+ min) before gettings os
error 110 if compute has been migrated to another pod.
2023-04-27 09:50:52 +04:00
Anastasia Lubennikova
92214578af Fix proxy_io_bytes_per_client metric: use branch_id identifier properly. (#4084)
It fixes the miscalculation of the metric for projects that use multiple
branches for the same endpoint.
We were under billing users with such projects. So we need to
communicate the change in Release Notes.
2023-04-26 17:47:54 +03:00
Christian Schwarz
6861259be7 add global metric for unexpected on-demand downloads (#4069)
Until we have toned down the prod logs to zero WARN and ERROR, we want a
dedicated metric for which we can have a dedicated alert.

fixes https://github.com/neondatabase/neon/issues/3924
2023-04-26 15:18:26 +02:00
Sergey Melnikov
11df2ee5d7 Add safekeeper-3.us-east-2.aws.neon.build (#4085) 2023-04-26 14:40:36 +03:00
57 changed files with 1809 additions and 214 deletions

View File

@@ -50,5 +50,7 @@ storage:
ansible_host: i-027662bd552bf5db0
safekeeper-2.us-east-2.aws.neon.build:
ansible_host: i-0de0b03a51676a6ce
safekeeper-3.us-east-2.aws.neon.build:
ansible_host: i-05f8ba2cda243bd18
safekeeper-99.us-east-2.aws.neon.build:
ansible_host: i-0d61b6a2ea32028d5

View File

@@ -111,8 +111,21 @@ jobs:
- name: Get postgres headers
run: make postgres-headers -j$(nproc)
- name: Run cargo clippy
run: ./run_clippy.sh
# cargo hack runs the given cargo subcommand (clippy in this case) for all feature combinations.
# This will catch compiler & clippy warnings in all feature combinations.
# TODO: use cargo hack for build and test as well, but, that's quite expensive.
# NB: keep clippy args in sync with ./run_clippy.sh
- run: |
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
if [ "$CLIPPY_COMMON_ARGS" = "" ]; then
echo "No clippy args found in .neon_clippy_args"
exit 1
fi
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
- name: Run cargo clippy (debug)
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
- name: Run cargo clippy (release)
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
- name: Check formatting

4
.neon_clippy_args Normal file
View File

@@ -0,0 +1,4 @@
# * `-A unknown_lints` do not warn about unknown lint suppressions
# that people with newer toolchains might use
# * `-D warnings` - fail on any warnings (`cargo` returns non-zero exit status)
export CLIPPY_COMMON_ARGS="--locked --workspace --all-targets -- -A unknown_lints -D warnings"

11
Cargo.lock generated
View File

@@ -4629,6 +4629,16 @@ dependencies = [
"valuable",
]
[[package]]
name = "tracing-error"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e"
dependencies = [
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
@@ -4879,6 +4889,7 @@ dependencies = [
"thiserror",
"tokio",
"tracing",
"tracing-error",
"tracing-subscriber",
"url",
"uuid",

View File

@@ -110,6 +110,7 @@ toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.2"

View File

@@ -249,18 +249,63 @@ impl ComputeNode {
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
#[derive(Clone)]
enum Replication {
Primary,
Static { lsn: Lsn },
HotStandby,
}
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.pgdata);
let hot_replica = if let Some(option) = spec.cluster.settings.find_ref("hot_standby") {
if let Some(value) = &option.value {
anyhow::ensure!(option.vartype == "bool");
matches!(value.as_str(), "on" | "yes" | "true")
} else {
false
}
} else {
false
};
let replication = if hot_replica {
Replication::HotStandby
} else if let Some(lsn) = spec.cluster.settings.find("recovery_target_lsn") {
Replication::Static {
lsn: Lsn::from_str(&lsn)?,
}
} else {
Replication::Primary
};
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
// Syncing safekeepers is only safe with primary nodes: if a primary
// is already connected it will be kicked out, so a secondary (standby)
// cannot sync safekeepers.
let lsn = match &replication {
Replication::Primary => {
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
lsn
}
Replication::Static { lsn } => {
info!("Starting read-only node at static LSN {}", lsn);
*lsn
}
Replication::HotStandby => {
info!("Initializing standby from latest Pageserver LSN");
Lsn(0)
}
};
info!(
"getting basebackup@{} from pageserver {}",
@@ -276,6 +321,13 @@ impl ComputeNode {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
match &replication {
Replication::Primary | Replication::Static { .. } => {}
Replication::HotStandby => {
add_standby_signal(pgdata_path)?;
}
}
Ok(())
}

View File

@@ -94,6 +94,7 @@ impl PgOptionsSerialize for GenericOptions {
pub trait GenericOptionsSearch {
fn find(&self, name: &str) -> Option<String>;
fn find_ref(&self, name: &str) -> Option<&GenericOption>;
}
impl GenericOptionsSearch for GenericOptions {
@@ -103,6 +104,12 @@ impl GenericOptionsSearch for GenericOptions {
let op = ops.iter().find(|s| s.name == name)?;
op.value.clone()
}
/// Lookup option by name, returning ref
fn find_ref(&self, name: &str) -> Option<&GenericOption> {
let ops = self.as_ref()?;
ops.iter().find(|s| s.name == name)
}
}
pub trait RoleExt {

View File

@@ -1,3 +1,4 @@
use std::fs::File;
use std::path::Path;
use std::str::FromStr;
@@ -145,6 +146,21 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
Ok(())
}
/// Create a standby.signal file
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of spec.json
info!("adding standby.signal");
let signalfile = pgdata_path.join("standby.signal");
if !signalfile.exists() {
info!("created standby.signal");
File::create(signalfile)?;
} else {
info!("reused pre-existing standby.signal");
}
Ok(())
}
/// Given a cluster spec json and open transaction it handles roles creation,
/// deletion and update.
#[instrument(skip_all)]

View File

@@ -8,6 +8,7 @@
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint::Replication;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
@@ -474,7 +475,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
cplane.new_endpoint(
tenant_id,
name,
timeline_id,
None,
pg_version,
Replication::Primary,
)?;
println!("Done");
}
Some(("branch", branch_match)) => {
@@ -560,20 +568,20 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.iter()
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
{
let lsn_str = match endpoint.lsn {
None => {
// -> primary endpoint
let lsn_str = match endpoint.replication {
Replication::Static(lsn) => {
// -> read-only endpoint
// Use the node's LSN.
lsn.to_string()
}
_ => {
// -> primary endpoint or hot replica
// Use the LSN at the end of the timeline.
timeline_infos
.get(&endpoint.timeline_id)
.map(|bi| bi.last_record_lsn.to_string())
.unwrap_or_else(|| "?".to_string())
}
Some(lsn) => {
// -> read-only endpoint
// Use the endpoint's LSN.
lsn.to_string()
}
};
let branch_name = timeline_name_mappings
@@ -619,7 +627,26 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.copied()
.context("Failed to parse postgres version from the argument string")?;
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?;
let hot_standby = sub_args
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
let replication = match (lsn, hot_standby) {
(Some(lsn), false) => Replication::Static(lsn),
(None, true) => Replication::Replica,
(None, false) => Replication::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
cplane.new_endpoint(
tenant_id,
&endpoint_id,
timeline_id,
port,
pg_version,
replication,
)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
@@ -637,7 +664,21 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
None
};
let hot_standby = sub_args
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
if let Some(endpoint) = endpoint {
match (&endpoint.replication, hot_standby) {
(Replication::Static(_), true) => {
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
}
(Replication::Primary, true) => {
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
}
_ => {}
}
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token)?;
} else {
@@ -659,6 +700,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<u32>("pg-version")
.copied()
.context("Failed to `pg-version` from the argument string")?;
let replication = match (lsn, hot_standby) {
(Some(lsn), false) => Replication::Static(lsn),
(None, true) => Replication::Replica,
(None, false) => Replication::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
// when used with custom port this results in non obvious behaviour
// port is remembered from first start command, i e
// start --port X
@@ -670,9 +719,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
tenant_id,
endpoint_id,
timeline_id,
lsn,
port,
pg_version,
replication,
)?;
ep.start(&auth_token)?;
}
@@ -928,6 +977,12 @@ fn cli() -> Command {
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
.required(false);
let hot_standby_arg = Arg::new("hot-standby")
.value_parser(value_parser!(bool))
.long("hot-standby")
.help("If set, the node will be a hot replica on the specified timeline")
.required(false);
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1052,6 +1107,7 @@ fn cli() -> Command {
.long("config-only")
.required(false))
.arg(pg_version_arg.clone())
.arg(hot_standby_arg.clone())
)
.subcommand(Command::new("start")
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
@@ -1062,6 +1118,7 @@ fn cli() -> Command {
.arg(lsn_arg)
.arg(port_arg)
.arg(pg_version_arg)
.arg(hot_standby_arg)
)
.subcommand(
Command::new("stop")

View File

@@ -68,18 +68,19 @@ impl ComputeControlPlane {
tenant_id: TenantId,
name: &str,
timeline_id: TimelineId,
lsn: Option<Lsn>,
port: Option<u16>,
pg_version: u32,
replication: Replication,
) -> Result<Arc<Endpoint>> {
let port = port.unwrap_or_else(|| self.get_port());
let ep = Arc::new(Endpoint {
name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
env: self.env.clone(),
pageserver: Arc::clone(&self.pageserver),
timeline_id,
lsn,
replication,
tenant_id,
pg_version,
});
@@ -95,6 +96,18 @@ impl ComputeControlPlane {
///////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Replication {
// Regular read-write node
Primary,
// if recovery_target_lsn is provided, and we want to pin the node to a specific LSN
Static(Lsn),
// Hot standby; read-only replica.
// Future versions may want to distinguish between replicas with hot standby
// feedback and other kinds of replication configurations.
Replica,
}
#[derive(Debug)]
pub struct Endpoint {
/// used as the directory name
@@ -102,7 +115,7 @@ pub struct Endpoint {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
pub lsn: Option<Lsn>,
pub replication: Replication,
// port and address of the Postgres server
pub address: SocketAddr,
@@ -153,9 +166,17 @@ impl Endpoint {
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn, if any
let recovery_target_lsn: Option<Lsn> =
conf.parse_field_optional("recovery_target_lsn", &context)?;
// parse recovery_target_lsn and primary_conninfo into Recovery Target, if any
let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") {
Replication::Static(Lsn::from_str(lsn_str)?)
} else if let Some(slot_name) = conf.get("primary_slot_name") {
let slot_name = slot_name.to_string();
let prefix = format!("repl_{}_", timeline_id);
assert!(slot_name.starts_with(&prefix));
Replication::Replica
} else {
Replication::Primary
};
// ok now
Ok(Endpoint {
@@ -164,7 +185,7 @@ impl Endpoint {
env: env.clone(),
pageserver: Arc::clone(pageserver),
timeline_id,
lsn: recovery_target_lsn,
replication,
tenant_id,
pg_version,
})
@@ -299,50 +320,83 @@ impl Endpoint {
conf.append("neon.pageserver_connstring", &pageserver_connstr);
conf.append("neon.tenant_id", &self.tenant_id.to_string());
conf.append("neon.timeline_id", &self.timeline_id.to_string());
if let Some(lsn) = self.lsn {
conf.append("recovery_target_lsn", &lsn.to_string());
}
conf.append_line("");
// Configure backpressure
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
// updates pages are not requested from pageserver.
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
// To be able to restore database in case of pageserver node crash, safekeeper should not
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
// (if they are not able to upload WAL to S3).
conf.append("max_replication_write_lag", "15MB");
conf.append("max_replication_flush_lag", "10GB");
// Replication-related configurations, such as WAL sending
match &self.replication {
Replication::Primary => {
// Configure backpressure
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
// updates pages are not requested from pageserver.
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
// To be able to restore database in case of pageserver node crash, safekeeper should not
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
// (if they are not able to upload WAL to S3).
conf.append("max_replication_write_lag", "15MB");
conf.append("max_replication_flush_lag", "10GB");
if !self.env.safekeepers.is_empty() {
// Configure Postgres to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
if !self.env.safekeepers.is_empty() {
// Configure Postgres to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
let safekeepers = self
.env
.safekeepers
.iter()
.map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>()
.join(",");
conf.append("neon.safekeepers", &safekeepers);
} else {
// We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver,
// so set more relaxed synchronous_commit.
conf.append("synchronous_commit", "remote_write");
let safekeepers = self
.env
.safekeepers
.iter()
.map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>()
.join(",");
conf.append("neon.safekeepers", &safekeepers);
} else {
// We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver,
// so set more relaxed synchronous_commit.
conf.append("synchronous_commit", "remote_write");
// Configure the node to stream WAL directly to the pageserver
// This isn't really a supported configuration, but can be useful for
// testing.
conf.append("synchronous_standby_names", "pageserver");
// Configure the node to stream WAL directly to the pageserver
// This isn't really a supported configuration, but can be useful for
// testing.
conf.append("synchronous_standby_names", "pageserver");
}
}
Replication::Static(lsn) => {
conf.append("recovery_target_lsn", &lsn.to_string());
}
Replication::Replica => {
assert!(!self.env.safekeepers.is_empty());
// TODO: use future host field from safekeeper spec
// Pass the list of safekeepers to the replica so that it can connect to any of them,
// whichever is availiable.
let sk_ports = self
.env
.safekeepers
.iter()
.map(|x| x.pg_port.to_string())
.collect::<Vec<_>>()
.join(",");
let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
let connstr = format!(
"host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
sk_hosts,
sk_ports,
&self.timeline_id.to_string(),
&self.tenant_id.to_string(),
);
let slot_name = format!("repl_{}_", self.timeline_id);
conf.append("primary_conninfo", connstr.as_str());
conf.append("primary_slot_name", slot_name.as_str());
conf.append("hot_standby", "on");
}
}
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
@@ -355,21 +409,27 @@ impl Endpoint {
}
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
let backup_lsn = if let Some(lsn) = self.lsn {
Some(lsn)
} else if !self.env.safekeepers.is_empty() {
// LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) {
None
} else {
Some(lsn)
let backup_lsn = match &self.replication {
Replication::Primary => {
if !self.env.safekeepers.is_empty() {
// LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) {
None
} else {
Some(lsn)
}
} else {
None
}
}
Replication::Static(lsn) => Some(*lsn),
Replication::Replica => {
None // Take the latest snapshot available to start with
}
} else {
None
};
self.do_basebackup(backup_lsn)?;
@@ -466,7 +526,7 @@ impl Endpoint {
// 3. Load basebackup
self.load_basebackup(auth_token)?;
if self.lsn.is_some() {
if self.replication != Replication::Primary {
File::create(self.pgdata().join("standby.signal"))?;
}

View File

@@ -13,7 +13,7 @@ use std::io::BufRead;
use std::str::FromStr;
/// In-memory representation of a postgresql.conf file
#[derive(Default)]
#[derive(Default, Debug)]
pub struct PostgresConf {
lines: Vec<String>,
hash: HashMap<String, String>,

View File

@@ -28,11 +28,6 @@
"value": "replica",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "wal_log_hints",
"value": "on",

View File

@@ -95,10 +95,13 @@ pub fn generate_wal_segment(
segno: u64,
system_id: u64,
pg_version: u32,
lsn: Lsn,
) -> Result<Bytes, SerializeError> {
assert_eq!(segno, lsn.segment_number(WAL_SEGMENT_SIZE));
match pg_version {
14 => v14::xlog_utils::generate_wal_segment(segno, system_id),
15 => v15::xlog_utils::generate_wal_segment(segno, system_id),
14 => v14::xlog_utils::generate_wal_segment(segno, system_id, lsn),
15 => v15::xlog_utils::generate_wal_segment(segno, system_id, lsn),
_ => Err(SerializeError::BadInput),
}
}

View File

@@ -195,6 +195,7 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLP_LONG_HEADER: u16 = 0x0002;
/* From fsm_internals.h */

View File

@@ -270,6 +270,11 @@ impl XLogPageHeaderData {
use utils::bin_ser::LeSer;
XLogPageHeaderData::des_from(&mut buf.reader())
}
pub fn encode(&self) -> Result<Bytes, SerializeError> {
use utils::bin_ser::LeSer;
self.ser().map(|b| b.into())
}
}
impl XLogLongPageHeaderData {
@@ -328,22 +333,32 @@ impl CheckPoint {
}
}
//
// Generate new, empty WAL segment.
// We need this segment to start compute node.
//
pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, SerializeError> {
/// Generate new, empty WAL segment, with correct block headers at the first
/// page of the segment and the page that contains the given LSN.
/// We need this segment to start compute node.
pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
let page_off = lsn.block_offset();
let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
let first_page_only = seg_off < XLOG_BLCKSZ;
let (shdr_rem_len, infoflags) = if first_page_only {
(seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
} else {
(0, 0)
};
let hdr = XLogLongPageHeaderData {
std: {
XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: pg_constants::XLP_LONG_HEADER,
xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
xlp_tli: PG_TLI,
xlp_pageaddr: pageaddr,
xlp_rem_len: 0,
xlp_rem_len: shdr_rem_len as u32,
..Default::default() // Put 0 in padding fields.
}
},
@@ -357,6 +372,33 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, Seriali
//zero out the rest of the file
seg_buf.resize(WAL_SEGMENT_SIZE, 0);
if !first_page_only {
let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
let header = XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
pg_constants::XLP_FIRST_IS_CONTRECORD
} else {
0
},
xlp_tli: PG_TLI,
xlp_pageaddr: lsn.page_lsn().0,
xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
page_off as u32
} else {
0u32
},
..Default::default() // Put 0 in padding fields.
};
let hdr_bytes = header.encode()?;
debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
debug_assert_ne!(block_offset, 0);
seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
}
Ok(seg_buf.freeze())
}

View File

@@ -99,7 +99,11 @@ struct S3WithTestBlobs {
#[async_trait::async_trait]
impl AsyncTestContext for MaybeEnabledS3 {
async fn setup() -> Self {
utils::logging::init(utils::logging::LogFormat::Test).expect("logging init failed");
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
)
.expect("logging init failed");
if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
info!(
"`{}` env variable is not set, skipping the test",

View File

@@ -27,7 +27,8 @@ signal-hook.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["json"] }
tracing-error.workspace = true
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
rand.workspace = true
serde_with.workspace = true
strum.workspace = true

View File

@@ -54,6 +54,8 @@ pub mod measured_stream;
pub mod serde_percent;
pub mod serde_regex;
pub mod tracing_span_assert;
/// use with fail::cfg("$name", "return(2000)")
#[macro_export]
macro_rules! failpoint_sleep_millis_async {

View File

@@ -56,7 +56,20 @@ where
}
}
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
/// Whether to add the `tracing_error` crate's `ErrorLayer`
/// to the global tracing subscriber.
///
pub enum TracingErrorLayerEnablement {
/// Do not add the `ErrorLayer`.
Disabled,
/// Add the `ErrorLayer` with the filter specified by RUST_LOG, defaulting to `info` if `RUST_LOG` is unset.
EnableWithRustLogFilter,
}
pub fn init(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
) -> anyhow::Result<()> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
@@ -67,21 +80,26 @@ pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
LogFormat::Test => log_layer.with_test_writer().boxed(),
};
log_layer.with_filter(rust_log_env_filter())
})
.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()))
.init();
let r = tracing_subscriber::registry();
let r = r.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
LogFormat::Test => log_layer.with_test_writer().boxed(),
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
match tracing_error_layer_enablement {
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
.init(),
TracingErrorLayerEnablement::Disabled => r.init(),
}
Ok(())
}

View File

@@ -62,29 +62,48 @@ impl Lsn {
}
/// Compute the offset into a segment
#[inline]
pub fn segment_offset(self, seg_sz: usize) -> usize {
(self.0 % seg_sz as u64) as usize
}
/// Compute LSN of the segment start.
#[inline]
pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
Lsn(self.0 - (self.0 % seg_sz as u64))
}
/// Compute the segment number
#[inline]
pub fn segment_number(self, seg_sz: usize) -> u64 {
self.0 / seg_sz as u64
}
/// Compute the offset into a block
#[inline]
pub fn block_offset(self) -> u64 {
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
self.0 % BLCKSZ
}
/// Compute the block offset of the first byte of this Lsn within this
/// segment
#[inline]
pub fn page_lsn(self) -> Lsn {
Lsn(self.0 - self.block_offset())
}
/// Compute the block offset of the first byte of this Lsn within this
/// segment
#[inline]
pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
(self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
}
/// Compute the bytes remaining in this block
///
/// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
#[inline]
pub fn remaining_in_block(self) -> u64 {
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
BLCKSZ - (self.0 % BLCKSZ)

View File

@@ -0,0 +1,287 @@
//! Assert that the current [`tracing::Span`] has a given set of fields.
//!
//! # Usage
//!
//! ```
//! use tracing_subscriber::prelude::*;
//! let registry = tracing_subscriber::registry()
//! .with(tracing_error::ErrorLayer::default());
//!
//! // Register the registry as the global subscriber.
//! // In this example, we'll only use it as a thread-local subscriber.
//! let _guard = tracing::subscriber::set_default(registry);
//!
//! // Then, in the main code:
//!
//! let span = tracing::info_span!("TestSpan", test_id = 1);
//! let _guard = span.enter();
//!
//! // ... down the call stack
//!
//! use utils::tracing_span_assert::{check_fields_present, MultiNameExtractor};
//! let extractor = MultiNameExtractor::new("TestExtractor", ["test", "test_id"]);
//! match check_fields_present([&extractor]) {
//! Ok(()) => {},
//! Err(missing) => {
//! panic!("Missing fields: {:?}", missing.into_iter().map(|f| f.name() ).collect::<Vec<_>>());
//! }
//! }
//! ```
//!
//! Recommended reading: https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
//!
use std::{
collections::HashSet,
fmt::{self},
hash::{Hash, Hasher},
};
pub enum ExtractionResult {
Present,
Absent,
}
pub trait Extractor: Send + Sync + std::fmt::Debug {
fn name(&self) -> &str;
fn extract(&self, fields: &tracing::field::FieldSet) -> ExtractionResult;
}
#[derive(Debug)]
pub struct MultiNameExtractor<const L: usize> {
name: &'static str,
field_names: [&'static str; L],
}
impl<const L: usize> MultiNameExtractor<L> {
pub fn new(name: &'static str, field_names: [&'static str; L]) -> MultiNameExtractor<L> {
MultiNameExtractor { name, field_names }
}
}
impl<const L: usize> Extractor for MultiNameExtractor<L> {
fn name(&self) -> &str {
self.name
}
fn extract(&self, fields: &tracing::field::FieldSet) -> ExtractionResult {
if fields.iter().any(|f| self.field_names.contains(&f.name())) {
ExtractionResult::Present
} else {
ExtractionResult::Absent
}
}
}
struct MemoryIdentity<'a>(&'a dyn Extractor);
impl<'a> MemoryIdentity<'a> {
fn as_ptr(&self) -> *const () {
self.0 as *const _ as *const ()
}
}
impl<'a> PartialEq for MemoryIdentity<'a> {
fn eq(&self, other: &Self) -> bool {
self.as_ptr() == other.as_ptr()
}
}
impl<'a> Eq for MemoryIdentity<'a> {}
impl<'a> Hash for MemoryIdentity<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.as_ptr().hash(state);
}
}
impl<'a> fmt::Debug for MemoryIdentity<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:p}: {}", self.as_ptr(), self.0.name())
}
}
/// The extractor names passed as keys to [`new`].
pub fn check_fields_present<const L: usize>(
must_be_present: [&dyn Extractor; L],
) -> Result<(), Vec<&dyn Extractor>> {
let mut missing: HashSet<MemoryIdentity> =
HashSet::from_iter(must_be_present.into_iter().map(|r| MemoryIdentity(r)));
let trace = tracing_error::SpanTrace::capture();
trace.with_spans(|md, _formatted_fields| {
missing.retain(|extractor| match extractor.0.extract(md.fields()) {
ExtractionResult::Present => false,
ExtractionResult::Absent => true,
});
!missing.is_empty() // continue walking up until we've found all missing
});
if missing.is_empty() {
Ok(())
} else {
Err(missing.into_iter().map(|mi| mi.0).collect())
}
}
#[cfg(test)]
mod tests {
use tracing_subscriber::prelude::*;
use super::*;
struct Setup {
_current_thread_subscriber_guard: tracing::subscriber::DefaultGuard,
tenant_extractor: MultiNameExtractor<2>,
timeline_extractor: MultiNameExtractor<2>,
}
fn setup_current_thread() -> Setup {
let tenant_extractor = MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"]);
let timeline_extractor = MultiNameExtractor::new("TimelineId", ["timeline_id", "timeline"]);
let registry = tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(tracing_error::ErrorLayer::default());
let guard = tracing::subscriber::set_default(registry);
Setup {
_current_thread_subscriber_guard: guard,
tenant_extractor,
timeline_extractor,
}
}
fn assert_missing(missing: Vec<&dyn Extractor>, expected: Vec<&dyn Extractor>) {
let missing: HashSet<MemoryIdentity> =
HashSet::from_iter(missing.into_iter().map(MemoryIdentity));
let expected: HashSet<MemoryIdentity> =
HashSet::from_iter(expected.into_iter().map(MemoryIdentity));
assert_eq!(missing, expected);
}
#[test]
fn positive_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", tenant_id = "tenant-1", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap();
}
#[test]
fn negative_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", timeline_id = "timeline-1");
let _guard = span.enter();
let missing =
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn positive_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", tenant_id = "tenant-1");
let _guard = span.enter();
let span = tracing::info_span!("grandchild", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap();
}
#[test]
fn negative_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn positive_subset_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", tenant_id = "tenant-1", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor]).unwrap();
}
#[test]
fn positive_subset_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", tenant_id = "tenant-1");
let _guard = span.enter();
let span = tracing::info_span!("grandchild", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor]).unwrap();
}
#[test]
fn negative_subset_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn negative_subset_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn tracing_error_subscriber_not_set_up() {
// no setup
let span = tracing::info_span!("foo", e = "some value");
let _guard = span.enter();
let extractor = MultiNameExtractor::new("E", ["e"]);
let missing = check_fields_present([&extractor]).unwrap_err();
assert_missing(missing, vec![&extractor]);
}
#[test]
#[should_panic]
fn panics_if_tracing_error_subscriber_has_wrong_filter() {
let r = tracing_subscriber::registry().with({
tracing_error::ErrorLayer::default().with_filter(
tracing_subscriber::filter::dynamic_filter_fn(|md, _| {
if md.is_span() && *md.level() == tracing::Level::INFO {
return false;
}
true
}),
)
});
let _guard = tracing::subscriber::set_default(r);
let span = tracing::info_span!("foo", e = "some value");
let _guard = span.enter();
let extractor = MultiNameExtractor::new("E", ["e"]);
let missing = check_fields_present([&extractor]).unwrap_err();
assert_missing(missing, vec![&extractor]);
}
}

View File

@@ -463,9 +463,13 @@ where
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
let wal_seg =
postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
let wal_seg = postgres_ffi::generate_wal_segment(
segno,
system_identifier,
self.timeline.pg_version,
self.lsn,
)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
self.ar.append(&header, &wal_seg[..]).await?;
Ok(())

View File

@@ -25,6 +25,7 @@ use pageserver::{
virtual_file,
};
use postgres_backend::AuthType;
use utils::logging::TracingErrorLayerEnablement;
use utils::signals::ShutdownSignals;
use utils::{
auth::JwtAuth, logging, project_git_version, sentry_init::init_sentry, signals::Signal,
@@ -86,8 +87,19 @@ fn main() -> anyhow::Result<()> {
}
};
// Initialize logging, which must be initialized before the custom panic hook is installed.
logging::init(conf.log_format)?;
// Initialize logging.
//
// It must be initialized before the custom panic hook is installed below.
//
// Regarding tracing_error enablement: at this time, we only use the
// tracing_error crate to debug_assert that log spans contain tenant and timeline ids.
// See `debug_assert_current_span_has_tenant_and_timeline_id` in the timeline module
let tracing_error_layer_enablement = if cfg!(debug_assertions) {
TracingErrorLayerEnablement::EnableWithRustLogFilter
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(conf.log_format, tracing_error_layer_enablement)?;
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.
@@ -226,6 +238,7 @@ fn start_pageserver(
);
set_build_info_metric(GIT_VERSION);
set_launch_timestamp_metric(launch_ts);
pageserver::preinitialize_metrics();
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes

View File

@@ -114,7 +114,7 @@ async fn import_rel(
path: &Path,
spcoid: Oid,
dboid: Oid,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
reader: &mut (impl AsyncRead + Unpin),
len: usize,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -200,7 +200,7 @@ async fn import_slru(
modification: &mut DatadirModification<'_>,
slru: SlruKind,
path: &Path,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
reader: &mut (impl AsyncRead + Unpin),
len: usize,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -612,8 +612,8 @@ async fn import_file(
Ok(None)
}
async fn read_all_bytes(reader: &mut (impl AsyncRead + Send + Sync + Unpin)) -> Result<Bytes> {
async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
let mut buf: Vec<u8> = vec![];
reader.read_to_end(&mut buf).await?;
Ok(Bytes::copy_from_slice(&buf[..]))
Ok(Bytes::from(buf))
}

View File

@@ -44,6 +44,8 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
pub async fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.

View File

@@ -205,6 +205,15 @@ static EVICTIONS_WITH_LOW_RESIDENCE_DURATION: Lazy<IntCounterVec> = Lazy::new(||
.expect("failed to define a metric")
});
pub static UNEXPECTED_ONDEMAND_DOWNLOADS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_unexpected_ondemand_downloads_count",
"Number of unexpected on-demand downloads. \
We log more context for each increment, so, forgo any labels in this metric.",
)
.expect("failed to define a metric")
});
/// Each [`Timeline`]'s [`EVICTIONS_WITH_LOW_RESIDENCE_DURATION`] metric.
#[derive(Debug)]
pub struct EvictionsWithLowResidenceDuration {
@@ -1132,3 +1141,10 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
poll_result
}
}
pub fn preinitialize_metrics() {
// We want to alert on this metric increasing.
// Initialize it eagerly, so that our alert rule can distinguish absence of the metric from metric value 0.
assert_eq!(UNEXPECTED_ONDEMAND_DOWNLOADS.get(), 0);
UNEXPECTED_ONDEMAND_DOWNLOADS.reset();
}

View File

@@ -20,7 +20,6 @@ use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use postgres_backend::PostgresBackendTCP;
use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
use pq_proto::framed::ConnectionError;
use pq_proto::FeStartupPacket;
@@ -32,6 +31,7 @@ use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
use tracing::*;
use utils::id::ConnectionId;
@@ -57,7 +57,10 @@ use crate::trace::Tracer;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<Bytes>> + '_ {
fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
where
IO: AsyncRead + AsyncWrite + Unpin,
{
async_stream::try_stream! {
loop {
let msg = tokio::select! {
@@ -65,8 +68,8 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = "pageserver is shutting down".to_string();
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
let msg = "pageserver is shutting down";
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}
@@ -125,7 +128,7 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
///
/// XXX: Currently, any trailing data after the EOF marker prints a warning.
/// Perhaps it should be a hard error?
async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow::Result<()> {
async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
let mut buf = [0u8; 512];
@@ -245,12 +248,14 @@ async fn page_service_conn_main(
.set_nodelay(true)
.context("could not set TCP_NODELAY")?;
let peer_addr = socket.peer_addr().context("get peer address")?;
// XXX: pgbackend.run() should take the connection_ctx,
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(conf, auth, connection_ctx);
let pgbackend = PostgresBackend::new(socket, auth_type, None)?;
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
.run(&mut conn_handler, task_mgr::shutdown_watcher)
@@ -332,13 +337,16 @@ impl PageServerHandler {
}
#[instrument(skip(self, pgb, ctx))]
async fn handle_pagerequests(
async fn handle_pagerequests<IO>(
&self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: RequestContext,
) -> anyhow::Result<()> {
) -> anyhow::Result<()>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
@@ -436,16 +444,19 @@ impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, pgb, ctx))]
async fn handle_import_basebackup(
async fn handle_import_basebackup<IO>(
&self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
base_lsn: Lsn,
_end_lsn: Lsn,
pg_version: u32,
ctx: RequestContext,
) -> Result<(), QueryError> {
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
@@ -486,15 +497,18 @@ impl PageServerHandler {
}
#[instrument(skip(self, pgb, ctx))]
async fn handle_import_wal(
async fn handle_import_wal<IO>(
&self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
ctx: RequestContext,
) -> Result<(), QueryError> {
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
@@ -690,16 +704,19 @@ impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, pgb, ctx))]
async fn handle_basebackup_request(
async fn handle_basebackup_request<IO>(
&mut self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
ctx: RequestContext,
) -> anyhow::Result<()> {
) -> anyhow::Result<()>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
let started = std::time::Instant::now();
// check that the timeline exists
@@ -770,10 +787,13 @@ impl PageServerHandler {
}
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for PageServerHandler {
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
fn check_auth_jwt(
&mut self,
_pgb: &mut PostgresBackendTCP,
_pgb: &mut PostgresBackend<IO>,
jwt_response: &[u8],
) -> Result<(), QueryError> {
// this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
@@ -801,7 +821,7 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for PageServerHandler {
fn startup(
&mut self,
_pgb: &mut PostgresBackendTCP,
_pgb: &mut PostgresBackend<IO>,
_sm: &FeStartupPacket,
) -> Result<(), QueryError> {
Ok(())
@@ -809,7 +829,7 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for PageServerHandler {
async fn process_query(
&mut self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError> {
let ctx = self.connection_ctx.attached_child();

View File

@@ -2886,7 +2886,13 @@ pub mod harness {
};
LOG_HANDLE.get_or_init(|| {
logging::init(logging::LogFormat::Test).expect("Failed to init test logging")
logging::init(
logging::LogFormat::Test,
// enable it in case in case the tests exercise code paths that use
// debug_assert_current_span_has_tenant_and_timeline_id
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
)
.expect("Failed to init test logging")
});
let repo_dir = PageServerConf::test_repo_dir(test_name);

View File

@@ -16,6 +16,7 @@ use tracing::{info, warn};
use crate::config::PageServerConf;
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
use remote_storage::{DownloadError, GenericRemoteStorage};
use utils::crashsafe::path_with_suffix_extension;
@@ -43,6 +44,8 @@ pub async fn download_layer_file<'a>(
layer_file_name: &'a LayerFileName,
layer_metadata: &'a LayerFileMetadata,
) -> Result<u64, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
let local_path = timeline_path.join(layer_file_name.file_name());
@@ -154,7 +157,7 @@ pub async fn download_layer_file<'a>(
.with_context(|| format!("Could not fsync layer file {}", local_path.display(),))
.map_err(DownloadError::Other)?;
tracing::info!("download complete: {}", local_path.display());
tracing::debug!("download complete: {}", local_path.display());
Ok(bytes_amount)
}

View File

@@ -4,6 +4,7 @@ pub mod delta_layer;
mod filename;
mod image_layer;
mod inmemory_layer;
pub(crate) mod layer_contents;
mod remote_layer;
use crate::config::PageServerConf;

View File

@@ -30,6 +30,9 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::layer_contents::virtual_value::{
VirtualValue, VirtualValueBuilder,
};
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -279,12 +282,12 @@ impl Layer for DeltaLayer {
// A subroutine to dump a single blob
let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
let buf = cursor.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
let val = VirtualValue::des(&buf)?;
let desc = match val {
Value::Image(img) => {
VirtualValue::NaturalImage(img) => {
format!(" img {} bytes", img.len())
}
Value::WalRecord(rec) => {
VirtualValue::NaturalWalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
@@ -293,6 +296,31 @@ impl Layer for DeltaLayer {
wal_desc
)
}
VirtualValue::ClosedLineage { image, lsns, .. } => {
format!(
" lin(closed,img) {} bytes max_lsn {} {} bytes tail {} recs",
buf.len(),
lsns.last().unwrap(),
image.len(),
lsns.len(),
)
}
VirtualValue::ClosedRecLineage { lsns, .. } => {
format!(
" lin(closed,rec) {} bytes max_lsn {} {} recs",
buf.len(),
lsns.last().unwrap(),
lsns.len() + 1,
)
}
VirtualValue::OpenLineage { lsns, .. } => {
format!(
" lin(open) {} bytes max_lsn {} {} recs",
buf.len(),
lsns.last().unwrap(),
lsns.len(),
)
}
};
Ok(desc)
};
@@ -350,10 +378,12 @@ impl Layer for DeltaLayer {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
offsets.push((entry_lsn, blob_ref.pos()));
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})?;
@@ -368,28 +398,94 @@ impl Layer for DeltaLayer {
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
let val = VirtualValue::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
VirtualValue::NaturalImage(img) => {
if lsn_range.contains(&entry_lsn) {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
}
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
VirtualValue::NaturalWalRecord(rec) => {
if lsn_range.contains(&entry_lsn) {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
VirtualValue::ClosedLineage {
image,
lsns,
records,
} => {
assert_eq!(lsns.len(), records.len());
for (lsn, rec) in
Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev())
{
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
if lsn_range.contains(&entry_lsn) {
reconstruct_state.img = Some((entry_lsn, image));
need_image = false;
break;
}
}
}
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
} => {
assert_eq!(lsns.len(), records.len());
for (lsn, rec) in
Iterator::zip(lsns.into_iter().rev(), records.into_iter().rev())
{
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
if lsn_range.contains(&entry_lsn) {
reconstruct_state.records.push((entry_lsn, image_rec));
need_image = false;
break;
}
}
VirtualValue::OpenLineage {
mut lsns,
mut records,
} => {
while let Some(lsn) = lsns.pop() {
let rec = records.pop().unwrap();
if lsn_range.contains(&lsn) {
reconstruct_state.records.push((lsn, rec));
}
}
assert_eq!(records.len(), 1);
if lsn_range.contains(&entry_lsn) {
reconstruct_state
.records
.push((entry_lsn, records.pop().unwrap()));
}
}
};
}
// release metadata lock and close the file
}
@@ -682,6 +778,8 @@ struct DeltaLayerWriterInner {
timeline_id: TimelineId,
tenant_id: TenantId,
vvbuilder: Option<(Key, VirtualValueBuilder)>,
key_start: Key,
lsn_range: Range<Lsn>,
@@ -724,6 +822,7 @@ impl DeltaLayerWriterInner {
path,
timeline_id,
tenant_id,
vvbuilder: None,
key_start,
lsn_range,
tree: tree_builder,
@@ -736,8 +835,42 @@ impl DeltaLayerWriterInner {
///
/// The values must be appended in key, lsn order.
///
fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
fn put_value(&mut self, new_key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
assert!(new_key >= self.key_start);
assert!(self.lsn_range.contains(&lsn));
match self.vvbuilder.take() {
None => {
let mut builder = VirtualValueBuilder::new();
let res = builder.push(lsn, val);
assert!(res.is_none());
self.vvbuilder = Some((new_key, builder));
}
Some((key, mut builder)) => {
if key != new_key {
let (lsn, vvalue) = builder.finish().unwrap();
self.put_value_bytes(
key,
lsn,
&VirtualValue::ser(&vvalue)?,
vvalue.will_init(),
)?;
builder = VirtualValueBuilder::new();
}
if let Some((old_lsn, vvalue)) = builder.push(lsn, val) {
self.put_value_bytes(
new_key,
old_lsn,
&VirtualValue::ser(&vvalue)?,
vvalue.will_init(),
)?;
}
self.vvbuilder = Some((new_key, builder));
}
}
Ok(())
}
fn put_value_bytes(
@@ -766,7 +899,14 @@ impl DeltaLayerWriterInner {
///
/// Finish writing the delta layer.
///
fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
// first, flush the last key's vvalue to disk.
if let Some((key, builder)) = self.vvbuilder.take() {
if let Some((lsn, vvalue)) = builder.finish() {
self.put_value_bytes(key, lsn, &VirtualValue::ser(&vvalue)?, vvalue.will_init())?;
};
}
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -937,7 +1077,7 @@ impl Drop for DeltaLayerWriter {
}
///
/// Iterator over all key-value pairse stored in a delta layer
/// Iterator over all key-value pairs stored in a delta layer
///
/// FIXME: This creates a Vector to hold the offsets of all key value pairs.
/// That takes up quite a lot of memory. Should do this in a more streaming
@@ -947,6 +1087,7 @@ struct DeltaValueIter<'a> {
all_offsets: Vec<(DeltaKey, BlobRef)>,
next_idx: usize,
reader: BlockCursor<Adapter<'a>>,
decode_queue: Option<<Vec<(Lsn, Value)> as IntoIterator>::IntoIter>,
}
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
@@ -990,12 +1131,21 @@ impl<'a> DeltaValueIter<'a> {
all_offsets,
next_idx: 0,
reader: BlockCursor::new(Adapter(inner)),
decode_queue: None,
};
Ok(iter)
}
fn next_res(&mut self) -> Result<Option<(Key, Lsn, Value)>> {
if let Some(data) = &mut self.decode_queue {
let res = data.next();
if let Some((lsn, value)) = res {
let key = self.all_offsets[self.next_idx - 1].0.key();
return Ok(Some((key, lsn, value)));
}
}
if self.next_idx < self.all_offsets.len() {
let (delta_key, blob_ref) = &self.all_offsets[self.next_idx];
@@ -1003,7 +1153,11 @@ impl<'a> DeltaValueIter<'a> {
let lsn = delta_key.lsn();
let buf = self.reader.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
let val_vec = VirtualValue::des(&buf)?.into_value_vec(lsn);
let mut iter = val_vec.into_iter();
let Some((lsn, val)) = iter.next() else { bail!("missing data in VirtualValue") };
self.decode_queue = Some(iter);
self.next_idx += 1;
Ok(Some((key, lsn, val)))
} else {

View File

@@ -0,0 +1 @@
pub(crate) mod virtual_value;

View File

@@ -0,0 +1,237 @@
use crate::repository::Value;
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
/// VirtualValue is stored in some Layers instead of the normal repository::Value.
///
/// It describes one or more Values that are associated to one
/// repostiory::Key, containing zero or one base image with all WAL records
/// that need to apply to this preceding page image. In balanced tree-based
/// indexes this reduces the number of full Keys we need to store, thus
/// reducing the size of the layer's index and increasing cache efficiency.
///
/// Additionally, the abstraction paves the way to implement compression in the
/// layer file themselves, as we'd just need to add a new variant to the
/// VirtualValue type for compressed types. Examples of such optimizations
/// are bitpacked and delta-encoded LSNs in the Lineage variants of this enum.
///
/// NOTE: Once committed into a hosted branch, these variants _must_ remain
/// in this order, and cannot be removed - they are part of the specification
/// of the physical layout of the DeltaLayer file. Any reordering is going to
/// change the meaning of bytes in existing files and break the compatibility
/// with old layers; so make sure you don't reorder these, nor should you
/// update the layout of existing variants. You can update new variants as long
/// as no user data is written using these variants.
///
/// NOTE: The first two variants are cloned over from repository::Value, which
/// was the definition of the stored data in DeltaLayer before VirtualValue.
/// These variants have the same layout and index, so they should (de)serialize
/// into the same binary format, guaranteeing backwards compatibility.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum VirtualValue {
/// NaturalImage: A natural WAL image, picked from PostgreSQL WAL.
NaturalImage(Bytes),
/// NaturalWalRecord: A natural WAL record, picked from PostgreSQL WAL.
NaturalWalRecord(NeonWalRecord),
/// ClosedLineage: A page image, followed by a set of WAL records that are
/// applied to that page image.
ClosedLineage {
image: Bytes,
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
/// ClosedRecLineage: A will-init WAL record, followed by a set of WAL
/// records that are applied to the page image of the WAL record.
ClosedRecLineage {
image_rec: NeonWalRecord,
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
/// OpenLineage: A set of WAL records that are applied to the same page,
/// but that do not have a known page image in this Layer.
OpenLineage {
lsns: Vec<Lsn>,
records: Vec<NeonWalRecord>,
},
}
impl VirtualValue {
pub(crate) fn into_value_vec(self, lsn: Lsn) -> Vec<(Lsn, Value)> {
match self {
VirtualValue::NaturalImage(img) => vec![(lsn, Value::Image(img))],
VirtualValue::NaturalWalRecord(rec) => vec![(lsn, Value::WalRecord(rec))],
VirtualValue::ClosedLineage {
image,
lsns,
records,
} => {
let mut res = Vec::with_capacity(lsns.len() + 1);
res.push((lsn, Value::Image(image)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
} => {
let mut res = Vec::with_capacity(lsns.len() + 1);
res.push((lsn, Value::WalRecord(image_rec)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
VirtualValue::OpenLineage { lsns, mut records } => {
let mut res = Vec::with_capacity(lsns.len() + 1);
let first_record = records.remove(0);
res.push((lsn, Value::WalRecord(first_record)));
for (lsn, rec) in Iterator::zip(lsns.into_iter(), records.into_iter()) {
res.push((lsn, Value::WalRecord(rec)));
}
res
}
}
}
pub(crate) fn will_init(&self) -> bool {
match self {
VirtualValue::NaturalImage(_) => true,
VirtualValue::NaturalWalRecord(rec) => rec.will_init(),
VirtualValue::ClosedLineage { .. } => true,
VirtualValue::ClosedRecLineage { .. } => true,
VirtualValue::OpenLineage { .. } => false,
}
}
}
impl From<Value> for VirtualValue {
fn from(value: Value) -> Self {
match value {
Value::Image(img) => VirtualValue::NaturalImage(img),
Value::WalRecord(rec) => VirtualValue::NaturalWalRecord(rec),
}
}
}
#[must_use = "deconstruct the value using ::finish to make sure you don't lose intermediate values"]
pub struct VirtualValueBuilder {
state: Option<(Lsn, VirtualValue)>,
}
impl VirtualValueBuilder {
pub fn new() -> Self {
Self { state: None }
}
#[must_use = "intermediate emitted values should be stored"]
pub fn push(&mut self, new_lsn: Lsn, value: Value) -> Option<(Lsn, VirtualValue)> {
if let Some((lsn, _)) = &self.state {
assert!(new_lsn > *lsn);
}
match value {
Value::Image(img) => {
let res = self.state.take();
self.state = Some((new_lsn, VirtualValue::NaturalImage(img)));
res
}
Value::WalRecord(new_rec) => {
if new_rec.will_init() {
let res = self.state.take();
self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec)));
return res;
}
match self.state.take() {
None => {
self.state = Some((new_lsn, VirtualValue::NaturalWalRecord(new_rec)));
None
}
Some((start_lsn, virtual_value)) => {
let new_vv = match virtual_value {
VirtualValue::NaturalImage(img) => VirtualValue::ClosedLineage {
image: img,
lsns: vec![new_lsn],
records: vec![new_rec],
},
VirtualValue::NaturalWalRecord(vv_start) => {
if vv_start.will_init() {
VirtualValue::ClosedRecLineage {
image_rec: vv_start,
lsns: vec![new_lsn],
records: vec![new_rec],
}
} else {
VirtualValue::OpenLineage {
lsns: vec![new_lsn],
records: vec![vv_start, new_rec],
}
}
}
VirtualValue::ClosedLineage {
image,
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::ClosedLineage {
image,
lsns,
records,
}
}
VirtualValue::ClosedRecLineage {
image_rec,
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::ClosedRecLineage {
image_rec,
lsns,
records,
}
}
VirtualValue::OpenLineage {
mut lsns,
mut records,
} => {
lsns.push(new_lsn);
records.push(new_rec);
VirtualValue::OpenLineage { lsns, records }
}
};
self.state = Some((start_lsn, new_vv));
None
}
}
}
}
}
#[must_use]
pub fn finish(mut self) -> Option<(Lsn, VirtualValue)> {
self.state.take()
}
}

View File

@@ -48,7 +48,7 @@ use crate::tenant::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::TimelineMetrics;
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
@@ -936,6 +936,7 @@ impl Timeline {
}
}
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
@@ -2355,6 +2356,7 @@ impl Timeline {
id,
ctx.task_kind()
);
UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
timeline.download_remote_layer(remote_layer).await?;
continue 'layer_map_search;
}
@@ -3818,11 +3820,13 @@ impl Timeline {
/// If the caller has a deadline or needs a timeout, they can simply stop polling:
/// we're **cancellation-safe** because the download happens in a separate task_mgr task.
/// So, the current download attempt will run to completion even if we stop polling.
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%remote_layer.short_id()))]
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
pub async fn download_remote_layer(
&self,
remote_layer: Arc<RemoteLayer>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
use std::sync::atomic::Ordering::Relaxed;
let permit = match Arc::clone(&remote_layer.ongoing_download)
@@ -3866,6 +3870,8 @@ impl Timeline {
.await;
if let Ok(size) = &result {
info!("layer file download finished");
// XXX the temp file is still around in Err() case
// and consumes space until we clean up upon pageserver restart.
self_clone.metrics.resident_physical_size_gauge.add(*size);
@@ -3937,6 +3943,8 @@ impl Timeline {
updates.flush();
drop(layers);
info!("on-demand download successful");
// Now that we've inserted the download into the layer map,
// close the semaphore. This will make other waiters for
// this download return Ok(()).
@@ -3944,7 +3952,7 @@ impl Timeline {
remote_layer.ongoing_download.close();
} else {
// Keep semaphore open. We'll drop the permit at the end of the function.
error!("on-demand download failed: {:?}", result.as_ref().unwrap_err());
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
}
// Don't treat it as an error if the task that triggered the download
@@ -4255,3 +4263,36 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
bail!("couldn't find an unused backup number for {:?}", path)
}
#[cfg(not(debug_assertions))]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {}
#[cfg(debug_assertions)]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
use utils::tracing_span_assert;
pub static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<
tracing_span_assert::MultiNameExtractor<2>,
> = once_cell::sync::Lazy::new(|| {
tracing_span_assert::MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"])
});
pub static TIMELINE_ID_EXTRACTOR: once_cell::sync::Lazy<
tracing_span_assert::MultiNameExtractor<2>,
> = once_cell::sync::Lazy::new(|| {
tracing_span_assert::MultiNameExtractor::new("TimelineId", ["timeline_id", "timeline"])
});
match tracing_span_assert::check_fields_present([
&*TENANT_ID_EXTRACTOR,
&*TIMELINE_ID_EXTRACTOR,
]) {
Ok(()) => (),
Err(missing) => panic!(
"missing extractors: {:?}",
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
),
}
}

View File

@@ -348,7 +348,7 @@ impl ConnectionManagerState {
.context("walreceiver connection handling failure")
}
.instrument(
info_span!("walreceiver_connection", id = %id, node_id = %new_sk.safekeeper_id),
info_span!("walreceiver_connection", tenant_id = %id.tenant_id, timeline_id = %id.timeline_id, node_id = %new_sk.safekeeper_id),
)
});

View File

@@ -370,6 +370,74 @@ lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
return found;
}
/*
* Evict a page (if present) from the local file cache
*/
void
lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
{
BufferTag tag;
FileCacheEntry* entry;
ssize_t rc;
bool found;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return;
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
if (!found)
{
/* nothing to do */
LWLockRelease(lfc_lock);
return;
}
/* remove the page from the cache */
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
/*
* If the chunk has no live entries, we can position the chunk to be
* recycled first.
*/
if (entry->bitmap[chunk_offs >> 5] == 0)
{
bool has_remaining_pages;
for (int i = 0; i < (BLOCKS_PER_CHUNK / 32); i++) {
if (entry->bitmap[i] != 0)
{
has_remaining_pages = true;
break;
}
}
/*
* Put the entry at the position that is first to be reclaimed when
* we have no cached pages remaining in the chunk
*/
if (!has_remaining_pages)
{
dlist_delete(&entry->lru_node);
dlist_push_head(&lfc_ctl->lru, &entry->lru_node);
}
}
/*
* Done: apart from empty chunks, we don't move chunks in the LRU when
* they're empty because eviction isn't usage.
*/
LWLockRelease(lfc_lock);
}
/*
* Try to read page from local cache.
* Returns true if page is found in local cache.
@@ -528,7 +596,6 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
LWLockRelease(lfc_lock);
}
/*
* Record structure holding the to be exposed cache data.
*/

View File

@@ -17,6 +17,8 @@
#include "pagestore_client.h"
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "storage/buf_internals.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
@@ -57,6 +59,8 @@ int n_unflushed_requests = 0;
int flush_every_n_requests = 8;
int readahead_buffer_size = 128;
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static void pageserver_flush(void);
static bool
@@ -467,6 +471,8 @@ pg_init_libpagestore(void)
smgr_hook = smgr_neon;
smgr_init_hook = smgr_init_neon;
dbsize_hook = neon_dbsize;
old_redo_read_buffer_filter = redo_read_buffer_filter;
redo_read_buffer_filter = neon_redo_read_buffer_filter;
}
lfc_init();
}

View File

@@ -24,6 +24,7 @@
#include "neon.h"
#include "walproposer.h"
#include "pagestore_client.h"
PG_MODULE_MAGIC;
void _PG_init(void);

View File

@@ -11,6 +11,7 @@
#ifndef NEON_H
#define NEON_H
#include "access/xlogreader.h"
/* GUCs */
extern char *neon_auth_token;
@@ -20,4 +21,11 @@ extern char *neon_tenant;
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
/*
* Returns true if we shouldn't do REDO on that block in record indicated by
* block_id; false otherwise.
*/
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
#endif /* NEON_H */

View File

@@ -207,6 +207,7 @@ extern void forget_cached_relsize(RelFileNode rnode, ForkNumber forknum);
extern void lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
extern bool lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
extern bool lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);

View File

@@ -189,6 +189,7 @@ typedef struct PrfHashEntry {
#define SH_DEFINE
#define SH_DECLARE
#include "lib/simplehash.h"
#include "neon.h"
/*
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
@@ -1209,6 +1210,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
if (ShutdownRequestPending)
return;
/* Don't log any pages if we're not allowed to do so. */
if (!XLogInsertAllowed())
return;
/*
* Whenever a VM or FSM page is evicted, WAL-log it. FSM and (some) VM
@@ -1375,8 +1379,18 @@ neon_get_request_lsn(bool *latest, RelFileNode rnode, ForkNumber forknum, BlockN
if (RecoveryInProgress())
{
/*
* We don't know if WAL has been generated but not yet replayed, so
* we're conservative in our estimates about latest pages.
*/
*latest = false;
lsn = GetXLogReplayRecPtr(NULL);
/*
* Get the last written LSN of this page.
*/
lsn = GetLastWrittenLSN(rnode, forknum, blkno);
lsn = nm_adjust_lsn(lsn);
elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
(uint32) ((lsn) >> 32), (uint32) (lsn));
}
@@ -1559,6 +1573,15 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
/*
* Newly created relation is empty, remember that in the relsize cache.
*
* Note that in REDO, this is called to make sure the relation fork exists,
* but it does not truncate the relation. So, we can only update the
* relsize if it didn't exist before.
*
* Also, in redo, we must make sure to update the cached size of the
* relation, as that is the primary source of truth for REDO's
* file length considerations, and as file extension isn't (perfectly)
* logged, we need to take care of that before we hit file size checks.
*
* FIXME: This is currently not just an optimization, but required for
* correctness. Postgres can call smgrnblocks() on the newly-created
* relation. Currently, we don't call SetLastWrittenLSN() when a new
@@ -1566,7 +1589,14 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
* cache, we might call smgrnblocks() on the newly-created relation before
* the creation WAL record hass been received by the page server.
*/
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
if (isRedo)
{
update_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
get_cached_relsize(reln->smgr_rnode.node, forkNum,
&reln->smgr_cached_nblocks[forkNum]);
}
else
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1831,6 +1861,26 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
.blockNum = blkno,
};
/*
* The redo process does not lock pages that it needs to replay but are
* not in the shared buffers, so a concurrent process may request the
* page after redo has decided it won't redo that page and updated the
* LwLSN for that page.
* If we're in hot standby we need to take care that we don't return
* until after REDO has finished replaying up to that LwLSN, as the page
* should have been locked up to that point.
*
* See also the description on neon_redo_read_buffer_filter below.
*
* NOTE: It is possible that the WAL redo process will still do IO due to
* concurrent failed read IOs. Those IOs should never have a request_lsn
* that is as large as the WAL record we're currently replaying, if it
* weren't for the behaviour of the LwLsn cache that uses the highest
* value of the LwLsn cache when the entry is not found.
*/
if (RecoveryInProgress() && !(MyBackendType == B_STARTUP))
XLogWaitForReplayOf(request_lsn);
/*
* Try to find prefetched page in the list of received pages.
*/
@@ -2584,3 +2634,143 @@ smgr_init_neon(void)
smgr_init_standard();
neon_init();
}
/*
* Return whether we can skip the redo for this block.
*
* The conditions for skipping the IO are:
*
* - The block is not in the shared buffers, and
* - The block is not in the local file cache
*
* ... because any subsequent read of the page requires us to read
* the new version of the page from the PageServer. We do not
* check the local file cache; we instead evict the page from LFC: it
* is cheaper than going through the FS calls to read the page, and
* limits the number of lock operations used in the REDO process.
*
* We have one exception to the rules for skipping IO: We always apply
* changes to shared catalogs' pages. Although this is mostly out of caution,
* catalog updates usually result in backends rebuilding their catalog snapshot,
* which means it's quite likely the modified page is going to be used soon.
*
* It is important to note that skipping WAL redo for a page also means
* the page isn't locked by the redo process, as there is no Buffer
* being returned, nor is there a buffer descriptor to lock.
* This means that any IO that wants to read this block needs to wait
* for the WAL REDO process to finish processing the WAL record before
* it allows the system to start reading the block, as releasing the
* block early could lead to phantom reads.
*
* For example, REDO for a WAL record that modifies 3 blocks could skip
* the first block, wait for a lock on the second, and then modify the
* third block. Without skipping, all blocks would be locked and phantom
* reads would not occur, but with skipping, a concurrent process could
* read block 1 with post-REDO contents and read block 3 with pre-REDO
* contents, where with REDO locking it would wait on block 1 and see
* block 3 with post-REDO contents only.
*/
bool
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
{
XLogRecPtr end_recptr = record->EndRecPtr;
XLogRecPtr prev_end_recptr = record->ReadRecPtr - 1;
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blkno;
BufferTag tag;
uint32 hash;
LWLock *partitionLock;
Buffer buffer;
bool no_redo_needed;
BlockNumber relsize;
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
return true;
#if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
elog(PANIC, "failed to locate backup block with ID %d", block_id);
#else
XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno);
#endif
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers.
* See also this function's top comment.
*/
if (!OidIsValid(rnode.dbNode))
return false;
INIT_BUFFERTAG(tag, rnode, forknum, blkno);
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);
/*
* Lock the partition of shared_buffers so that it can't be updated
* concurrently.
*/
LWLockAcquire(partitionLock, LW_SHARED);
/* Try to find the relevant buffer */
buffer = BufTableLookup(&tag, hash);
no_redo_needed = buffer < 0;
/* we don't have the buffer in memory, update lwLsn past this record */
if (no_redo_needed)
{
SetLastWrittenLSNForBlock(end_recptr, rnode, forknum, blkno);
lfc_evict(rnode, forknum, blkno);
}
else
{
SetLastWrittenLSNForBlock(prev_end_recptr, rnode, forknum, blkno);
}
LWLockRelease(partitionLock);
/* Extend the relation if we know its size */
if (get_cached_relsize(rnode, forknum, &relsize))
{
if (relsize < blkno + 1)
update_cached_relsize(rnode, forknum, blkno + 1);
}
else
{
/*
* Size was not cached. We populate the cache now, with the size of the
* relation measured after this WAL record is applied.
*
* This length is later reused when we open the smgr to read the block,
* which is fine and expected.
*/
NeonResponse *response;
NeonNblocksResponse *nbresponse;
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.tag = T_NeonNblocksRequest,
},
.rnode = rnode,
.forknum = forknum,
};
response = page_server_request(&request);
Assert(response->tag == T_NeonNblocksResponse);
nbresponse = (NeonNblocksResponse *) response;
Assert(nbresponse->n_blocks > blkno);
set_cached_relsize(rnode, forknum, nbresponse->n_blocks);
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
}
return no_redo_needed;
}

View File

@@ -1964,18 +1964,26 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback * hs)
{
if (safekeeper[i].appendResponse.hs.ts != 0)
{
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.xmin, hs->xmin))
HotStandbyFeedback *skhs = &safekeeper[i].appendResponse.hs;
if (FullTransactionIdIsNormal(skhs->xmin)
&& FullTransactionIdPrecedes(skhs->xmin, hs->xmin))
{
hs->xmin = safekeeper[i].appendResponse.hs.xmin;
hs->ts = safekeeper[i].appendResponse.hs.ts;
hs->xmin = skhs->xmin;
hs->ts = skhs->ts;
}
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.catalog_xmin, hs->catalog_xmin))
if (FullTransactionIdIsNormal(skhs->catalog_xmin)
&& FullTransactionIdPrecedes(skhs->catalog_xmin, hs->xmin))
{
hs->catalog_xmin = safekeeper[i].appendResponse.hs.catalog_xmin;
hs->ts = safekeeper[i].appendResponse.hs.ts;
hs->catalog_xmin = skhs->catalog_xmin;
hs->ts = skhs->ts;
}
}
}
if (hs->xmin.value == ~0)
hs->xmin = InvalidFullTransactionId;
if (hs->catalog_xmin.value == ~0)
hs->catalog_xmin = InvalidFullTransactionId;
}
/*

View File

@@ -1,8 +1,8 @@
use crate::{cancellation::CancelClosure, error::UserFacingError};
use futures::TryFutureExt;
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::{io, net::SocketAddr};
use std::{io, net::SocketAddr, time::Duration};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::NoTls;
@@ -130,9 +130,23 @@ impl ConnCfg {
async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream)> {
use tokio_postgres::config::Host;
// wrap TcpStream::connect with timeout
let connect_with_timeout = |host, port| {
let connection_timeout = Duration::from_millis(10000);
tokio::time::timeout(connection_timeout, TcpStream::connect((host, port))).map(
move |res| match res {
Ok(tcpstream_connect_res) => tcpstream_connect_res,
Err(_) => Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("exceeded connection timeout {connection_timeout:?}"),
)),
},
)
};
let connect_once = |host, port| {
info!("trying to connect to compute node at {host}:{port}");
TcpStream::connect((host, port)).and_then(|socket| async {
connect_with_timeout(host, port).and_then(|socket| async {
let socket_addr = socket.peer_addr()?;
// This prevents load balancer from severing the connection.
socket2::SockRef::from(&socket).set_keepalive(true)?;
@@ -165,7 +179,6 @@ impl ConnCfg {
Host::Unix(_) => continue, // unix sockets are not welcome here
};
// TODO: maybe we should add a timeout.
match connect_once(host, *port).await {
Ok(socket) => return Ok(socket),
Err(err) => {

View File

@@ -95,7 +95,7 @@ fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
current_metrics.push((
Ids {
endpoint_id: endpoint_id.to_string(),
branch_id: "".to_string(),
branch_id: branch_id.to_string(),
},
(value, Utc::now()),
));

View File

@@ -95,9 +95,9 @@ pub async fn task_main(
handle_client(config, &cancel_map, session_id, socket).await
}
.unwrap_or_else(|e| {
.unwrap_or_else(move |e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
error!(?session_id, "per-client task finished with an error: {e:#}");
}),
);
}

View File

@@ -1,4 +1,5 @@
#!/bin/bash
#!/usr/bin/env bash
set -euo pipefail
# If you save this in your path under the name "cargo-zclippy" (or whatever
# name you like), then you can run it as "cargo zclippy" from the shell prompt.
@@ -8,7 +9,11 @@
# warnings and errors right in the editor.
# In vscode, this setting is Rust-analyzer>Check On Save:Command
# * `-A unknown_lints` do not warn about unknown lint suppressions
# that people with newer toolchains might use
# * `-D warnings` - fail on any warnings (`cargo` returns non-zero exit status)
cargo clippy --locked --all --all-targets --all-features -- -A unknown_lints -D warnings
# NB: the CI runs the full feature powerset, so, it catches slightly more errors
# at the expense of longer runtime. This script is used by developers, so, don't
# do that here.
thisscript="${BASH_SOURCE[0]}"
thisscript_dir="$(dirname "$thisscript")"
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
exec cargo clippy --all-features $CLIPPY_COMMON_ARGS

View File

@@ -134,7 +134,10 @@ fn main() -> anyhow::Result<()> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(LogFormat::from_config(&args.log_format)?)?;
logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
info!("version: {GIT_VERSION}");

View File

@@ -3,6 +3,7 @@
use anyhow::Context;
use std::str;
use std::str::FromStr;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span, Instrument};
@@ -49,12 +50,14 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
if cmd.starts_with("START_WAL_PUSH") {
Ok(SafekeeperPostgresCommand::StartWalPush)
} else if cmd.starts_with("START_REPLICATION") {
let re =
Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
let re = Regex::new(
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)",
)
.unwrap();
let mut caps = re.captures_iter(cmd);
let start_lsn = caps
.next()
.map(|cap| cap[1].parse::<Lsn>())
.map(|cap| Lsn::from_str(&cap[1]))
.context("parse start LSN from START_REPLICATION command")??;
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn })
} else if cmd.starts_with("IDENTIFY_SYSTEM") {

View File

@@ -18,6 +18,7 @@ use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogF
use postgres_ffi::{XLogSegNo, PG_TLI};
use std::cmp::{max, min};
use bytes::Bytes;
use std::fs::{self, remove_file, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
@@ -36,6 +37,7 @@ use postgres_ffi::XLOG_BLCKSZ;
use postgres_ffi::waldecoder::WalStreamDecoder;
use pq_proto::SystemId;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub trait Storage {
@@ -478,6 +480,13 @@ pub struct WalReader {
// We don't have WAL locally if LSN is less than local_start_lsn
local_start_lsn: Lsn,
// We will respond with zero-ed bytes before this Lsn as long as
// pos is in the same segment as timeline_start_lsn.
timeline_start_lsn: Lsn,
// integer version number of PostgreSQL, e.g. 14; 15; 16
pg_version: u32,
system_id: SystemId,
timeline_start_segment: Option<Bytes>,
}
impl WalReader {
@@ -488,19 +497,27 @@ impl WalReader {
start_pos: Lsn,
enable_remote_read: bool,
) -> Result<Self> {
if start_pos < state.timeline_start_lsn {
if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
bail!("state uninitialized, no data to read");
}
// TODO: Upgrade to bail!() once we know this couldn't possibly happen
if state.timeline_start_lsn == Lsn(0) {
warn!("timeline_start_lsn uninitialized before initializing wal reader");
}
if start_pos
< state
.timeline_start_lsn
.segment_lsn(state.server.wal_seg_size as usize)
{
bail!(
"Requested streaming from {}, which is before the start of the timeline {}",
"Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
start_pos,
state.timeline_start_lsn
);
}
// TODO: add state.timeline_start_lsn == Lsn(0) check
if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
bail!("state uninitialized, no data to read");
}
Ok(Self {
workdir,
timeline_dir,
@@ -509,10 +526,65 @@ impl WalReader {
wal_segment: None,
enable_remote_read,
local_start_lsn: state.local_start_lsn,
timeline_start_lsn: state.timeline_start_lsn,
pg_version: state.server.pg_version / 10000,
system_id: state.server.system_id,
timeline_start_segment: None,
})
}
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
// If this timeline is new, we may not have a full segment yet, so
// we pad the first bytes of the timeline's first WAL segment with 0s
if self.pos < self.timeline_start_lsn {
debug_assert_eq!(
self.pos.segment_number(self.wal_seg_size),
self.timeline_start_lsn.segment_number(self.wal_seg_size)
);
// All bytes after timeline_start_lsn are in WAL, but those before
// are not, so we manually construct an empty segment for the bytes
// not available in this timeline.
if self.timeline_start_segment.is_none() {
let it = postgres_ffi::generate_wal_segment(
self.timeline_start_lsn.segment_number(self.wal_seg_size),
self.system_id,
self.pg_version,
self.timeline_start_lsn,
)?;
self.timeline_start_segment = Some(it);
}
assert!(self.timeline_start_segment.is_some());
let segment = self.timeline_start_segment.take().unwrap();
let seg_bytes = &segment[..];
// How much of the current segment have we already consumed?
let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
// How many bytes may we consume in total?
let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
debug_assert!(seg_bytes.len() > pos_seg_offset);
debug_assert!(seg_bytes.len() > tl_start_seg_offset);
// Copy as many bytes as possible into the buffer
let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
self.pos += len as u64;
// If we're done with the segment, we can release it's memory.
// However, if we're not yet done, store it so that we don't have to
// construct the segment the next time this function is called.
if self.pos < self.timeline_start_lsn {
self.timeline_start_segment = Some(segment);
}
return Ok(len);
}
let mut wal_segment = match self.wal_segment.take() {
Some(reader) => reader,
None => self.open_segment().await?,

View File

@@ -42,12 +42,16 @@ def main(args: argparse.Namespace):
res: DefaultDict[str, DefaultDict[str, Dict[str, bool]]]
res = defaultdict(lambda: defaultdict(dict))
logging.info("connecting to the database...")
with psycopg2.connect(connstr, connect_timeout=10) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
logging.info("fetching flaky tests...")
cur.execute(FLAKY_TESTS_QUERY, (interval_days,))
rows = cur.fetchall()
try:
logging.info("connecting to the database...")
with psycopg2.connect(connstr, connect_timeout=30) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
logging.info("fetching flaky tests...")
cur.execute(FLAKY_TESTS_QUERY, (interval_days,))
rows = cur.fetchall()
except psycopg2.OperationalError as exc:
logging.error("cannot fetch flaky tests from the DB due to an error", exc)
rows = []
for row in rows:
logging.info(f"\t{row['parent_suite'].replace('.', '/')}/{row['suite']}.py::{row['test']}")

View File

@@ -430,7 +430,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(LogFormat::from_config(&args.log_format)?)?;
logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);

View File

@@ -53,6 +53,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"pageserver_storage_operations_seconds_global_count",
"pageserver_storage_operations_seconds_global_sum",
"pageserver_storage_operations_seconds_global_bucket",
"pageserver_unexpected_ondemand_downloads_count_total",
"libmetrics_launch_timestamp",
"libmetrics_build_info",
"libmetrics_tracing_event_count_total",

View File

@@ -1451,6 +1451,7 @@ class NeonCli(AbstractNeonCli):
branch_name: str,
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
port: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
@@ -1470,6 +1471,8 @@ class NeonCli(AbstractNeonCli):
args.extend(["--port", str(port)])
if endpoint_id is not None:
args.append(endpoint_id)
if hot_standby:
args.extend(["--hot-standby", "true"])
res = self.raw_cli(args)
res.check_returncode()
@@ -2206,6 +2209,7 @@ class Endpoint(PgProtocol):
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
self.env = env
self.running = False
self.branch_name: Optional[str] = None # dubious
self.endpoint_id: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
@@ -2217,6 +2221,7 @@ class Endpoint(PgProtocol):
self,
branch_name: str,
endpoint_id: Optional[str] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> "Endpoint":
@@ -2231,12 +2236,14 @@ class Endpoint(PgProtocol):
if endpoint_id is None:
endpoint_id = self.env.generate_endpoint_id()
self.endpoint_id = endpoint_id
self.branch_name = branch_name
self.env.neon_cli.endpoint_create(
branch_name,
endpoint_id=self.endpoint_id,
tenant_id=self.tenant_id,
lsn=lsn,
hot_standby=hot_standby,
port=self.port,
)
path = Path("endpoints") / self.endpoint_id / "pgdata"
@@ -2361,6 +2368,7 @@ class Endpoint(PgProtocol):
self,
branch_name: str,
endpoint_id: Optional[str] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> "Endpoint":
@@ -2375,6 +2383,7 @@ class Endpoint(PgProtocol):
branch_name=branch_name,
endpoint_id=endpoint_id,
config_lines=config_lines,
hot_standby=hot_standby,
lsn=lsn,
).start()
@@ -2408,6 +2417,7 @@ class EndpointFactory:
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
) -> Endpoint:
ep = Endpoint(
@@ -2421,6 +2431,7 @@ class EndpointFactory:
return ep.create_start(
branch_name=branch_name,
endpoint_id=endpoint_id,
hot_standby=hot_standby,
config_lines=config_lines,
lsn=lsn,
)
@@ -2431,6 +2442,7 @@ class EndpointFactory:
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
) -> Endpoint:
ep = Endpoint(
@@ -2449,6 +2461,7 @@ class EndpointFactory:
branch_name=branch_name,
endpoint_id=endpoint_id,
lsn=lsn,
hot_standby=hot_standby,
config_lines=config_lines,
)
@@ -2458,6 +2471,36 @@ class EndpointFactory:
return self
def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None
return self.create(
branch_name=branch_name,
endpoint_id=endpoint_id,
tenant_id=origin.tenant_id,
lsn=None,
hot_standby=True,
config_lines=config_lines,
)
def new_replica_start(
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None
return self.create_start(
branch_name=branch_name,
endpoint_id=endpoint_id,
tenant_id=origin.tenant_id,
lsn=None,
hot_standby=True,
config_lines=config_lines,
)
@dataclass
class SafekeeperPort:

View File

@@ -59,11 +59,6 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"value": "replica",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": """

View File

@@ -0,0 +1,79 @@
import pytest
from fixtures.neon_fixtures import NeonEnv
@pytest.mark.timeout(1800)
def test_hot_standby(neon_simple_env: NeonEnv):
env = neon_simple_env
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
primary_lsn = None
cought_up = False
queries = [
"SHOW neon.timeline_id",
"SHOW neon.tenant_id",
"SELECT relname FROM pg_class WHERE relnamespace = current_schema()::regnamespace::oid",
"SELECT COUNT(*), SUM(i) FROM test",
]
responses = dict()
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
p_con.commit()
with p_con.cursor() as p_cur:
p_cur.execute("SELECT pg_current_wal_insert_lsn()::text")
res = p_cur.fetchone()
assert res is not None
(lsn,) = res
primary_lsn = lsn
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
# Note that this may generate more WAL if the transaction has changed
# things, but we don't care about that.
p_con.commit()
for query in queries:
with p_con.cursor() as p_cur:
p_cur.execute(query)
res = p_cur.fetchone()
assert res is not None
response = res
responses[query] = response
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
res = s_cur.fetchone()
assert res is not None
while not cought_up:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
res = secondary_cursor.fetchone()
assert res is not None
(secondary_lsn,) = res
# There may be more changes on the primary after we got our LSN
# due to e.g. autovacuum, but that shouldn't impact the content
# of the tables, so we check whether we've replayed up to at
# least after the commit of the `test` table.
cought_up = secondary_lsn >= primary_lsn
# Explicit commit to flush any transient transaction-level state.
s_con.commit()
for query in queries:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute(query)
response = secondary_cursor.fetchone()
assert response is not None
assert response == responses[query]