Compare commits

..

14 Commits

Author SHA1 Message Date
Ivan Efremov
5d6501f4aa Set NEON_MOTD to null to fix regression tests
NEON_MOTD env variable is set for psql greetings to be printed
alogn with the session_id. Disable this for regression tests
2025-07-24 10:19:03 +03:00
Tristan Partin
9b2e6f862a Set an upper limit on PG backpressure throttling (#12675)
## Problem
Tenant split test revealed another bug with PG backpressure throttling
that under some cases PS may never report its progress back to SK (e.g.,
observed when aborting tenant shard where the old shard needs to
re-establish SK connection and re-ingest WALs from a much older LSN). In
this case, PG may get stuck forever.

## Summary of changes
As a general precaution that PS feedback mechanism may not always be
reliable, this PR uses the previously introduced WAL write rate limit
mechanism to slow down write rates instead of completely pausing it. The
idea is to introduce a new
`databricks_effective_max_wal_bytes_per_second`, which is set to
`databricks_max_wal_mb_per_second` when no PS back pressure and is set
to `10KB` when there is back pressure. This way, PG can still write to
SK, though at a very low speed.

The PR also fixes the problem that the current WAL rate limiting
mechanism is too coarse grained and cannot enforce limits < 1MB. This is
because it always resets the rate limiter after 1 second, even if PG
could have written more data in the past second. The fix is to introduce
a `batch_end_time_us` which records the expected end time of the current
batch. For example, if PG writes 10MB of data in a single batch, and max
WAL write rate is set as `1MB/s`, then `batch_end_time_us` will be set
as 10 seconds later.

## How is this tested?
Tweaked the existing test, and also did manual testing on dev. I set
`max_replication_flush_lag` as 1GB, and loaded 500GB pgbench tables.
It's expected to see PG gets throttled periodically because PS will
accumulate 4GB of data before flushing.

Results:
when PG is throttled:
```
9500000 of 3300000000 tuples (0%) done (elapsed 10.36 s, remaining 3587.62 s)
9600000 of 3300000000 tuples (0%) done (elapsed 124.07 s, remaining 42523.59 s)
9700000 of 3300000000 tuples (0%) done (elapsed 255.79 s, remaining 86763.97 s)
9800000 of 3300000000 tuples (0%) done (elapsed 315.89 s, remaining 106056.52 s)
9900000 of 3300000000 tuples (0%) done (elapsed 412.75 s, remaining 137170.58 s)
```

when PS just flushed:
```
18100000 of 3300000000 tuples (0%) done (elapsed 433.80 s, remaining 78655.96 s)
18200000 of 3300000000 tuples (0%) done (elapsed 433.85 s, remaining 78231.71 s)
18300000 of 3300000000 tuples (0%) done (elapsed 433.90 s, remaining 77810.62 s)
18400000 of 3300000000 tuples (0%) done (elapsed 433.96 s, remaining 77395.86 s)
18500000 of 3300000000 tuples (0%) done (elapsed 434.03 s, remaining 76987.27 s)
18600000 of 3300000000 tuples (0%) done (elapsed 434.08 s, remaining 76579.59 s)
18700000 of 3300000000 tuples (0%) done (elapsed 434.13 s, remaining 76177.12 s)
18800000 of 3300000000 tuples (0%) done (elapsed 434.19 s, remaining 75779.45 s)
18900000 of 3300000000 tuples (0%) done (elapsed 434.84 s, remaining 75489.40 s)
19000000 of 3300000000 tuples (0%) done (elapsed 434.89 s, remaining 75097.90 s)
19100000 of 3300000000 tuples (0%) done (elapsed 434.94 s, remaining 74712.56 s)
19200000 of 3300000000 tuples (0%) done (elapsed 498.93 s, remaining 85254.20 s)
19300000 of 3300000000 tuples (0%) done (elapsed 498.97 s, remaining 84817.95 s)
19400000 of 3300000000 tuples (0%) done (elapsed 623.80 s, remaining 105486.76 s)
19500000 of 3300000000 tuples (0%) done (elapsed 745.86 s, remaining 125476.51 s)
```

Co-authored-by: Chen Luo <chen.luo@databricks.com>
2025-07-23 22:37:27 +00:00
Tristan Partin
12e87d7a9f Add neon.lakebase_mode boolean GUC (#12714)
This GUC will become useful for temporarily disabling Lakebase-specific
features during the code merge.

Signed-off-by: Tristan Partin <tristan.partin@databricks.com>
2025-07-23 22:37:20 +00:00
Mikhail
a56afee269 Accept primary compute spec in /promote, promotion corner cases testing (#12574)
https://github.com/neondatabase/cloud/issues/19011
- Accept `ComputeSpec` in `/promote` instead of just passing safekeepers
and LSN. Update API spec
- Add corner case tests for promotion when promotion or perwarm fails
(using failpoints)
- Print root error for prewarm and promotion in status handlers
2025-07-23 20:11:34 +00:00
Alex Chi Z.
9e6ca2932f fix(test): convert bool to lowercase when invoking neon-cli (#12688)
## Problem

There has been some inconsistencies of providing tenant config via
`tenant_create` and via other tenant config APIs due to how the
properties are processed: in `tenant_create`, the test framework calls
neon-cli and therefore puts those properties in the cmdline. In other
cases, it's done via the HTTP API by directly serializing to a JSON.
When using the cmdline, the program only accepts serde bool that is
true/false.

## Summary of changes

Convert Python bool into `true`/`false` when using neon-cli.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-23 18:56:37 +00:00
HaoyuHuang
63ea4b0579 A few more compute_tool changes (#12687)
## Summary of changes
All changes are no-op except that the tracing-appender lib is upgraded
from 0.2.2 to 0.2.3
2025-07-23 18:30:33 +00:00
Folke Behrens
20881ef65e otel: Use blocking reqwest in dedicated thread (#12699)
## Problem

OTel 0.28+ by default uses blocking operations in a dedicated thread and
doesn't start a tokio runtime. Reqwest as currently configured wants to
spawn tokio tasks.

## Summary of changes

Use blocking reqwest.

This PR just mitigates the current issue.
2025-07-23 18:21:36 +00:00
Conrad Ludgate
a695713727 [sql-over-http] Reset session state between pooled connection re-use (#12681)
Session variables can be set during one sql-over-http query and observed
on another when that pooled connection is re-used. To address this we
can use `RESET ALL;` before re-using the connection. LKB-2495

To be on the safe side, we can opt for a full `DISCARD ALL;`, but that
might have performance regressions since it also clears any query plans.
See pgbouncer docs
https://www.pgbouncer.org/config.html#server_reset_query.

`DISCARD ALL` is currently defined as:
```
CLOSE ALL;
SET SESSION AUTHORIZATION DEFAULT;
RESET ALL;
DEALLOCATE ALL;
UNLISTEN *;
SELECT pg_advisory_unlock_all();
DISCARD PLANS;
DISCARD TEMP;
DISCARD SEQUENCES;
```

I've opted to keep everything here except the `DISCARD PLANS`. I've
modified the code so that this query is executed in the background when
a connection is returned to the pool, rather than when taken from the
pool.

This should marginally improve performance for Neon RLS by removing 1
(localhost) round trip. I don't believe that keeping query plans could
be a security concern. It's a potential side channel, but I can't
imagine what you could extract from it.

---

Thanks to
https://github.com/neondatabase/neon/pull/12659#discussion_r2219016205
for probing the idea in my head.
2025-07-23 17:43:43 +00:00
Alex Chi Z.
5c57e8a11b feat(pageserver): rework reldirv2 rollout (#12576)
## Problem

LKB-197, #9516 

To make sure the migration path is smooth.

The previous plan is to store new relations in new keyspace and old ones
in old keyspace until it gets dropped. This makes the migration path
hard as we can't validate v2 writes and can't rollback. This patch gives
us a more smooth migration path:

- The first time we enable reldirv2 for a tenant, we copy over
everything in the old keyspace to the new one. This might create a short
spike of latency for the create relation operation, but it's oneoff.
- After that, we have identical v1/v2 keyspace and read/write both of
them. We validate reads every time we list the reldirs.
- If we are in `migrating` mode, use v1 as source of truth and log a
warning for failed v2 operations. If we are in `migrated` mode, use v2
as source of truth and error when writes fail.
- One compatibility test uses dataset from the time where we enabled
reldirv2 (of the original rollout plan), which only has relations
written to the v2 keyspace instead of the v1 keyspace. We had to adjust
it accordingly.
- Add `migrated_at` in index_part to indicate the LSN where we did the
initialize.

TODOs:

- Test if relv1 can be read below the migrated_at LSN.
- Move the initialization process to L0 compaction instead of doing it
on the write path.
- Disable relcache in the relv2 test case so that all code path gets
fully tested.

## Summary of changes

- New behavior of reldirv2 migration flags as described above.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-23 16:12:46 +00:00
Alexander Bayandin
84a2556c9f compute-node.Dockerfile: update bullseye-backports backports url (#12700)
## Problem

> bullseye-backports has reached end-of-life and is no longer supported
or updated

From: https://backports.debian.org/Instructions/

This causes the compute-node image build to fail with the following
error:
```
0.099 Err:5 http://deb.debian.org/debian bullseye-backports Release
0.099   404  Not Found [IP: 146.75.122.132 80]
...
1.293 E: The repository 'http://deb.debian.org/debian bullseye-backports Release' does not have a Release file.
```

## Summary of changes
- Use archive version of `bullseye-backports`
2025-07-23 14:45:52 +00:00
Conrad Ludgate
761e9e0e1d [proxy] move read_info from the compute connection to be as late as possible (#12660)
Second attempt at #12130, now with a smaller diff.

This allows us to skip allocating for things like parameter status and
notices that we will either just forward untouched, or discard.

LKB-2494
2025-07-23 13:33:21 +00:00
Dmitrii Kovalkov
94cb9a79d9 safekeeper: generation aware timeline tombstones (#12482)
## Problem
With safekeeper migration in mind, we can now pull/exclude the timeline
multiple times within the same safekeeper. To avoid races between out of
order requests, we need to ignore the pull/exclude requests if we have
already seen a higher generation.

- Closes: https://github.com/neondatabase/neon/issues/12186
- Closes: [LKB-949](https://databricks.atlassian.net/browse/LKB-949)

## Summary of changes
- Annotate timeline tombstones in safekeeper with request generation.
- Replace `ignore_tombstone` option with `mconf` in
`PullTimelineRequest`
- Switch membership in `pull_timeline` if the existing/pulled timeline
has an older generation.
- Refuse to switch membership if the timeline is being deleted
(`is_canceled`).
- Refuse to switch membership in compute greeting request if the
safekeeper is not a member of `mconf`.
- Pass `mconf` in `PullTimelineRequest` in safekeeper_service

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-07-23 11:01:04 +00:00
Tristan Partin
fc242afcc2 PG ignore PageserverFeedback from unknown shards (#12671)
## Problem
When testing tenant splits, I found that PG can get backpressure
throttled indefinitely if the split is aborted afterwards. It turns out
that each PageServer activates new shard separately even before the
split is committed and they may start sending PageserverFeedback to PG
directly. As a result, if the split is aborted, no one resets the
pageserver feedback in PG, and thus PG will be backpressure throttled
forever unless it's restarted manually.

## Summary of changes
This PR fixes this problem by having
`walprop_pg_process_safekeeper_feedback` simply ignore all pageserver
feedback from unknown shards. The source of truth here is defined by the
shard map, which is guaranteed to be reloaded only after the split is
committed.

Co-authored-by: Chen Luo <chen.luo@databricks.com>
2025-07-22 21:41:56 +00:00
Suhas Thalanki
e275221aef add hadron-specific metrics (#12686) 2025-07-22 21:17:45 +00:00
89 changed files with 2521 additions and 1028 deletions

6
Cargo.lock generated
View File

@@ -1388,6 +1388,7 @@ dependencies = [
"tower-http",
"tower-otel",
"tracing",
"tracing-appender",
"tracing-opentelemetry",
"tracing-subscriber",
"tracing-utils",
@@ -7934,11 +7935,12 @@ dependencies = [
[[package]]
name = "tracing-appender"
version = "0.2.2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e"
checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf"
dependencies = [
"crossbeam-channel",
"thiserror 1.0.69",
"time",
"tracing-subscriber",
]

View File

@@ -145,7 +145,7 @@ num-traits = "0.2.19"
once_cell = "1.13"
opentelemetry = "0.30"
opentelemetry_sdk = "0.30"
opentelemetry-otlp = { version = "0.30", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-otlp = { version = "0.30", default-features = false, features = ["http-proto", "trace", "http", "reqwest-blocking-client"] }
opentelemetry-semantic-conventions = "0.30"
parking_lot = "0.12"
parquet = { version = "53", default-features = false, features = ["zstd"] }
@@ -222,6 +222,7 @@ tracing-log = "0.2"
tracing-opentelemetry = "0.31"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
tracing-appender = "0.2.3"
try-lock = "0.2.5"
test-log = { version = "0.2.17", default-features = false, features = ["log"] }
twox-hash = { version = "1.6.3", default-features = false }

View File

@@ -133,7 +133,7 @@ RUN case $DEBIAN_VERSION in \
# Install newer version (3.25) from backports.
# libstdc++-10-dev is required for plv8
bullseye) \
echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/bullseye-backports.list; \
echo "deb http://archive.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/bullseye-backports.list; \
VERSION_INSTALLS="cmake/bullseye-backports cmake-data/bullseye-backports libstdc++-10-dev"; \
;; \
# Version-specific installs for Bookworm (PG17):

View File

@@ -62,6 +62,7 @@ tokio-stream.workspace = true
tonic.workspace = true
tower-otel.workspace = true
tracing.workspace = true
tracing-appender.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true

View File

@@ -51,6 +51,7 @@ use compute_tools::compute::{
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::logger::*;
use compute_tools::params::*;
use compute_tools::pg_isready::get_pg_isready_bin;
use compute_tools::spec::*;
use rlimit::{Resource, setrlimit};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
@@ -194,7 +195,12 @@ fn main() -> Result<()> {
.build()?;
let _rt_guard = runtime.enter();
let tracing_provider = init(cli.dev)?;
let mut log_dir = None;
if cli.lakebase_mode {
log_dir = std::env::var("COMPUTE_CTL_LOG_DIRECTORY").ok();
}
let (tracing_provider, _file_logs_guard) = init(cli.dev, log_dir)?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
@@ -226,6 +232,8 @@ fn main() -> Result<()> {
cli.installed_extensions_collection_interval,
)),
pg_init_timeout: cli.pg_init_timeout.map(Duration::from_secs),
pg_isready_bin: get_pg_isready_bin(&cli.pgbin),
instance_id: std::env::var("INSTANCE_ID").ok(),
lakebase_mode: cli.lakebase_mode,
},
config,
@@ -238,8 +246,14 @@ fn main() -> Result<()> {
deinit_and_exit(tracing_provider, exit_code);
}
fn init(dev_mode: bool) -> Result<Option<tracing_utils::Provider>> {
let provider = init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
fn init(
dev_mode: bool,
log_dir: Option<String>,
) -> Result<(
Option<tracing_utils::Provider>,
Option<tracing_appender::non_blocking::WorkerGuard>,
)> {
let (provider, file_logs_guard) = init_tracing_and_logging(DEFAULT_LOG_LEVEL, &log_dir)?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
@@ -250,7 +264,7 @@ fn init(dev_mode: bool) -> Result<Option<tracing_utils::Provider>> {
info!("compute build_tag: {}", &BUILD_TAG.to_string());
Ok(provider)
Ok((provider, file_logs_guard))
}
fn get_config(cli: &Cli) -> Result<ComputeConfig> {

View File

@@ -113,10 +113,12 @@ pub struct ComputeNodeParams {
/// Interval for installed extensions collection
pub installed_extensions_collection_interval: Arc<AtomicU64>,
/// Hadron instance ID of the compute node.
pub instance_id: Option<String>,
/// Timeout of PG compute startup in the Init state.
pub pg_init_timeout: Option<Duration>,
// Path to the `pg_isready` binary.
pub pg_isready_bin: String,
pub lakebase_mode: bool,
}
@@ -486,6 +488,7 @@ impl ComputeNode {
port: this.params.external_http_port,
config: this.compute_ctl_config.clone(),
compute_id: this.params.compute_id.clone(),
instance_id: this.params.instance_id.clone(),
}
.launch(&this);
@@ -1785,6 +1788,34 @@ impl ComputeNode {
Ok::<(), anyhow::Error>(())
}
// Signal to the configurator to refresh the configuration by pulling a new spec from the HCC.
// Note that this merely triggers a notification on a condition variable the configurator thread
// waits on. The configurator thread (in configurator.rs) pulls the new spec from the HCC and
// applies it.
pub async fn signal_refresh_configuration(&self) -> Result<()> {
let states_allowing_configuration_refresh = [
ComputeStatus::Running,
ComputeStatus::Failed,
// ComputeStatus::RefreshConfigurationPending,
];
let state = self.state.lock().expect("state lock poisoned");
if states_allowing_configuration_refresh.contains(&state.status) {
// state.status = ComputeStatus::RefreshConfigurationPending;
self.state_changed.notify_all();
Ok(())
} else if state.status == ComputeStatus::Init {
// If the compute is in Init state, we can't refresh the configuration immediately,
// but we should be able to do that soon.
Ok(())
} else {
Err(anyhow::anyhow!(
"Cannot refresh compute configuration in state {:?}",
state.status
))
}
}
// Wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop.
#[instrument(skip_all)]

View File

@@ -90,6 +90,7 @@ impl ComputeNode {
}
/// If there is a prewarm request ongoing, return `false`, `true` otherwise.
/// Has a failpoint "compute-prewarm"
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
@@ -112,9 +113,8 @@ impl ComputeNode {
Err(err) => {
crate::metrics::LFC_PREWARM_ERRORS.inc();
error!(%err, "could not prewarm LFC");
LfcPrewarmState::Failed {
error: err.to_string(),
error: format!("{err:#}"),
}
}
};
@@ -135,16 +135,20 @@ impl ComputeNode {
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-prewarm", |_| {
bail!("prewarm configured to fail because of a failpoint")
});
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
let res = request.send().await.context("querying endpoint storage")?;
let status = res.status();
match status {
match res.status() {
StatusCode::OK => (),
StatusCode::NOT_FOUND => {
return Ok(false);
}
_ => bail!("{status} querying endpoint storage"),
status => bail!("{status} querying endpoint storage"),
}
let mut uncompressed = Vec::new();
@@ -205,7 +209,7 @@ impl ComputeNode {
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC state to endpoint storage");
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: err.to_string(),
error: format!("{err:#}"),
};
}
@@ -213,16 +217,22 @@ impl ComputeNode {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from Postgres");
let mut compressed = Vec::new();
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.get_local_cache_state()", &[])
.await
.context("querying LFC state")?
.try_get::<usize, &[u8]>(0)
.context("deserializing LFC state")
.map(ZstdEncoder::new)?
.context("querying LFC state")?;
let state = row
.try_get::<usize, Option<&[u8]>>(0)
.context("deserializing LFC state")?;
let Some(state) = state else {
info!(%url, "empty LFC state, not exporting");
return Ok(());
};
let mut compressed = Vec::new();
ZstdEncoder::new(state)
.read_to_end(&mut compressed)
.await
.context("compressing LFC state")?;

View File

@@ -1,11 +1,12 @@
use crate::compute::ComputeNode;
use anyhow::{Context, Result, bail};
use compute_api::{
responses::{LfcPrewarmState, PromoteState, SafekeepersLsn},
spec::ComputeMode,
};
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
use compute_api::spec::ComputeMode;
use itertools::Itertools;
use std::collections::HashMap;
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use tracing::info;
use utils::lsn::Lsn;
impl ComputeNode {
@@ -13,21 +14,22 @@ impl ComputeNode {
/// and http client disconnects, this does not stop promotion, and subsequent
/// calls block until promote finishes.
/// Called by control plane on secondary after primary endpoint is terminated
pub async fn promote(self: &Arc<Self>, safekeepers_lsn: SafekeepersLsn) -> PromoteState {
/// Has a failpoint "compute-promotion"
pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
let cloned = self.clone();
let promote_fn = async move || {
let Err(err) = cloned.promote_impl(cfg).await else {
return PromoteState::Completed;
};
tracing::error!(%err, "promoting");
PromoteState::Failed {
error: format!("{err:#}"),
}
};
let start_promotion = || {
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
tokio::spawn(async move {
tx.send(match cloned.promote_impl(safekeepers_lsn).await {
Ok(_) => PromoteState::Completed,
Err(err) => {
tracing::error!(%err, "promoting");
PromoteState::Failed {
error: err.to_string(),
}
}
})
});
tokio::spawn(async move { tx.send(promote_fn().await) });
rx
};
@@ -47,9 +49,7 @@ impl ComputeNode {
task.borrow().clone()
}
// Why do we have to supply safekeepers?
// For secondary we use primary_connection_conninfo so safekeepers field is empty
async fn promote_impl(&self, safekeepers_lsn: SafekeepersLsn) -> Result<()> {
async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> {
{
let state = self.state.lock().unwrap();
let mode = &state.pspec.as_ref().unwrap().spec.mode;
@@ -73,7 +73,7 @@ impl ComputeNode {
.await
.context("connecting to postgres")?;
let primary_lsn = safekeepers_lsn.wal_flush_lsn;
let primary_lsn = cfg.wal_flush_lsn;
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
const RETRIES: i32 = 20;
for i in 0..=RETRIES {
@@ -86,7 +86,7 @@ impl ComputeNode {
if last_wal_replay_lsn >= primary_lsn {
break;
}
tracing::info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
sleep(Duration::from_secs(1)).await;
}
if last_wal_replay_lsn < primary_lsn {
@@ -96,7 +96,7 @@ impl ComputeNode {
// using $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!(
"ALTER SYSTEM SET neon.safekeepers='{}'",
safekeepers_lsn.safekeepers
cfg.spec.safekeeper_connstrings.join(",")
);
client
.query(&safekeepers_sql, &[])
@@ -106,6 +106,12 @@ impl ComputeNode {
.query("SELECT pg_reload_conf()", &[])
.await
.context("reloading postgres config")?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-promotion", |_| {
bail!("promotion configured to fail because of a failpoint")
});
let row = client
.query_one("SELECT * FROM pg_promote()", &[])
.await
@@ -125,8 +131,36 @@ impl ComputeNode {
bail!("replica in read only mode after promotion");
}
let mut state = self.state.lock().unwrap();
state.pspec.as_mut().unwrap().spec.mode = ComputeMode::Primary;
Ok(())
{
let mut state = self.state.lock().unwrap();
let spec = &mut state.pspec.as_mut().unwrap().spec;
spec.mode = ComputeMode::Primary;
let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap();
let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap();
Self::merge_spec(new_conf, existing_conf);
}
info!("applied new spec, reconfiguring as primary");
self.reconfigure()
}
/// Merge old and new Postgres conf specs to apply on secondary.
/// Change new spec's port and safekeepers since they are supplied
/// differenly
fn merge_spec(new_conf: &mut String, existing_conf: &str) {
let mut new_conf_set: HashMap<&str, &str> = new_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.remove("neon.safekeepers");
let existing_conf_set: HashMap<&str, &str> = existing_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.insert("port", existing_conf_set["port"]);
*new_conf = new_conf_set
.iter()
.map(|(k, v)| format!("{k}={v}"))
.join("\n");
}
}

View File

@@ -1,16 +1,10 @@
use std::fs::File;
use std::sync::Arc;
use std::thread;
use std::{path::Path, sync::Arc};
use anyhow::Result;
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeSpec;
use tracing::{error, info, instrument};
use crate::{
compute::{ComputeNode, ParsedSpec},
spec::get_config_from_control_plane,
};
use crate::compute::ComputeNode;
#[instrument(skip_all)]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
@@ -18,27 +12,12 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
loop {
let mut state = compute.state.lock().unwrap();
if compute.params.lakebase_mode {
/* BEGIN_HADRON */
// RefreshConfiguration should only be used inside the loop
assert_ne!(state.status, ComputeStatus::RefreshConfiguration);
/* END_HADRON */
while state.status != ComputeStatus::ConfigurationPending
&& state.status != ComputeStatus::RefreshConfigurationPending
&& state.status != ComputeStatus::Failed
{
info!("configurator: compute status: {:?}, sleeping", state.status);
state = compute.state_changed.wait(state).unwrap();
}
} else {
// We have to re-check the status after re-acquiring the lock because it could be that
// the status has changed while we were waiting for the lock, and we might not need to
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
// we are waiting for a condition variable that will never be signaled.
if state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
}
// We have to re-check the status after re-acquiring the lock because it could be that
// the status has changed while we were waiting for the lock, and we might not need to
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
// we are waiting for a condition variable that will never be signaled.
if state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
}
// Re-check the status after waking up
@@ -47,146 +26,17 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
state.set_status(ComputeStatus::Configuration, &compute.state_changed);
drop(state);
let mut _new_status = ComputeStatus::Failed;
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.reconfigure() {
error!("could not configure compute node: {}", e);
// TODO(BRC-1726): Remove this panic once we fix the state machine to allow futher
// configuration attempts after a failed configuration attempt.
error!("Compute node exiting due to configuration failure.");
std::process::exit(1);
} else {
_new_status = ComputeStatus::Running;
new_status = ComputeStatus::Running;
info!("compute node configured");
}
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(10000));
compute.set_status(_new_status);
} else if state.status == ComputeStatus::RefreshConfigurationPending {
info!(
"compute node suspects its configuration is out of date, now refreshing configuration"
);
state.set_status(ComputeStatus::RefreshConfiguration, &compute.state_changed);
// Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC.
// This is the only thread that can move compute_ctl out of the `RefreshConfiguration` state, so it
// is safe to drop the lock like this.
drop(state);
let get_spec_result: anyhow::Result<Option<ComputeSpec>> =
if let Some(sp) = &compute.params.spec_path_test_only {
// This path is only to make testing easier. In production we always get the spec from the HCM.
info!("reloading spec.json from path: {:?}", sp);
let path = Path::new(sp);
if let Ok(file) = File::open(path) {
match serde_json::from_reader(file) {
Ok(spec) => Ok(Some(spec)),
Err(e) => {
error!("could not parse spec file: {}", e);
Err(anyhow::anyhow!("could not parse spec file: {}", e))
}
}
} else {
error!("could not open spec file at path: {:?}", sp);
Err(anyhow::anyhow!(
"could not open spec file at path: {:?}",
sp
))
}
} else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
get_config_from_control_plane(control_plane_uri, &compute.params.compute_id).map(
|(spec_opt, _)| {
info!("got spec from control plane: {:?}", spec_opt);
spec_opt
},
)
} else {
Err(anyhow::anyhow!("spec_path_test_only is not set"))
};
// Parse any received ComputeSpec and transpose the result into a Result<Option<ParsedSpec>>.
let parsed_spec_result: Result<Option<ParsedSpec>> = get_spec_result.and_then(|spec| {
if let Some(spec) = spec {
if let Ok(pspec) = ParsedSpec::try_from(spec) {
Ok(Some(pspec))
} else {
Err(anyhow::anyhow!("could not parse spec"))
}
} else {
Ok(None)
}
});
let new_status: ComputeStatus;
match parsed_spec_result {
// Control plane (HCM) returned a spec and we were able to parse it.
Ok(Some(pspec)) => {
{
let mut state = compute.state.lock().unwrap();
// Defensive programming to make sure this thread is indeed the only one that can move the compute
// node out of the `RefreshConfiguration` state. Would be nice if we can encode this invariant
// into the type system.
assert_eq!(state.status, ComputeStatus::RefreshConfiguration);
if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone())
== Some(pspec.pageserver_connstr.clone())
{
info!(
"Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration."
);
state.status = ComputeStatus::Running;
compute.state_changed.notify_all();
drop(state);
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
// state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
// the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
// "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
// but it's not worth forking the codebase too much for this minor point alone right now.
ComputeNode::set_spec(&compute.params, &mut state, pspec);
}
match compute.reconfigure() {
Ok(_) => {
info!("Refresh configuration: compute node configured");
new_status = ComputeStatus::Running;
}
Err(e) => {
error!(
"Refresh configuration: could not configure compute node: {}",
e
);
// Set the compute node back to the `RefreshConfigurationPending` state if the configuration
// was not successful. It should be okay to treat this situation the same as if the loop
// hasn't executed yet as long as the detection side keeps notifying.
new_status = ComputeStatus::RefreshConfigurationPending;
}
}
}
// Control plane (HCM)'s response does not contain a spec. This is the "Empty" attachment case.
Ok(None) => {
info!(
"Compute Manager signaled that this compute is no longer attached to any storage. Exiting."
);
// We just immediately terminate the whole compute_ctl in this case. It's not necessary to attempt a
// clean shutdown as Postgres is probably not responding anyway (which is why we are in this refresh
// configuration state).
std::process::exit(1);
}
// Various error cases:
// - The request to the control plane (HCM) either failed or returned a malformed spec.
// - compute_ctl itself is configured incorrectly (e.g., compute_id is not set).
Err(e) => {
error!(
"Refresh configuration: error getting a parsed spec: {:?}",
e
);
new_status = ComputeStatus::RefreshConfigurationPending;
// We may be dealing with an overloaded HCM if we end up in this path. Backoff 5 seconds before
// retrying to avoid hammering the HCM.
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
compute.set_status(new_status);
} else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");

View File

@@ -0,0 +1,60 @@
use metrics::{
IntCounter, IntGaugeVec, core::Collector, proto::MetricFamily, register_int_counter,
register_int_gauge_vec,
};
use once_cell::sync::Lazy;
// Counter keeping track of the number of PageStream request errors reported by Postgres.
// An error is registered every time Postgres calls compute_ctl's /refresh_configuration API.
// Postgres will invoke this API if it detected trouble with PageStream requests (get_page@lsn,
// get_base_backup, etc.) it sends to any pageserver. An increase in this counter value typically
// indicates Postgres downtime, as PageStream requests are critical for Postgres to function.
pub static POSTGRES_PAGESTREAM_REQUEST_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_pagestream_request_errors_total",
"Number of PageStream request errors reported by the postgres process"
)
.expect("failed to define a metric")
});
// Counter keeping track of the number of compute configuration errors due to Postgres statement
// timeouts. An error is registered every time `ComputeNode::reconfigure()` fails due to Postgres
// error code 57014 (query cancelled). This statement timeout typically occurs when postgres is
// stuck in a problematic retry loop when the PS is reject its connection requests (usually due
// to PG pointing at the wrong PS). We should investigate the root cause when this counter value
// increases by checking PG and PS logs.
pub static COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_configure_statement_timeout_errors_total",
"Number of compute configuration errors due to Postgres statement timeouts."
)
.expect("failed to define a metric")
});
pub static COMPUTE_ATTACHED: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pg_cctl_attached",
"Compute node attached status (1 if attached)",
&[
"pg_compute_id",
"pg_instance_id",
"tenant_id",
"timeline_id"
]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = Vec::new();
metrics.extend(POSTGRES_PAGESTREAM_REQUEST_ERRORS.collect());
metrics.extend(COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.collect());
metrics.extend(COMPUTE_ATTACHED.collect());
metrics
}
pub fn initialize_metrics() {
Lazy::force(&POSTGRES_PAGESTREAM_REQUEST_ERRORS);
Lazy::force(&COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS);
Lazy::force(&COMPUTE_ATTACHED);
}

View File

@@ -16,13 +16,29 @@ use crate::http::JsonResponse;
#[derive(Clone, Debug)]
pub(in crate::http) struct Authorize {
compute_id: String,
// BEGIN HADRON
// Hadron instance ID. Only set if it's a Lakebase V1 a.k.a. Hadron instance.
instance_id: Option<String>,
// END HADRON
jwks: JwkSet,
validation: Validation,
}
impl Authorize {
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
pub fn new(compute_id: String, instance_id: Option<String>, jwks: JwkSet) -> Self {
let mut validation = Validation::new(Algorithm::EdDSA);
// BEGIN HADRON
let use_rsa = jwks.keys.iter().any(|jwk| {
jwk.common
.key_algorithm
.is_some_and(|alg| alg == jsonwebtoken::jwk::KeyAlgorithm::RS256)
});
if use_rsa {
validation = Validation::new(Algorithm::RS256);
}
// END HADRON
validation.validate_exp = true;
// Unused by the control plane
validation.validate_nbf = false;
@@ -34,6 +50,7 @@ impl Authorize {
Self {
compute_id,
instance_id,
jwks,
validation,
}
@@ -47,10 +64,20 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
fn authorize(&mut self, mut request: Request<Body>) -> Self::Future {
let compute_id = self.compute_id.clone();
let is_hadron_instance = self.instance_id.is_some();
let jwks = self.jwks.clone();
let validation = self.validation.clone();
Box::pin(async move {
// BEGIN HADRON
// In Hadron deployments the "external" HTTP endpoint on compute_ctl can only be
// accessed by trusted components (enforced by dblet network policy), so we can bypass
// all auth here.
if is_hadron_instance {
return Ok(request);
}
// END HADRON
let TypedHeader(Authorization(bearer)) = request
.extract_parts::<TypedHeader<Authorization<Bearer>>>()
.await

View File

@@ -96,7 +96,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/SafekeepersLsn"
$ref: "#/components/schemas/ComputeSchemaWithLsn"
responses:
200:
description: Promote succeeded or wasn't started
@@ -297,14 +297,7 @@ paths:
content:
application/json:
schema:
type: object
required:
- spec
properties:
spec:
# XXX: I don't want to explain current spec in the OpenAPI format,
# as it could be changed really soon. Consider doing it later.
type: object
$ref: "#/components/schemas/ComputeSchema"
responses:
200:
description: Compute configuration finished.
@@ -591,18 +584,25 @@ components:
type: string
example: "1.0.0"
SafekeepersLsn:
ComputeSchema:
type: object
required:
- safekeepers
- spec
properties:
spec:
type: object
ComputeSchemaWithLsn:
type: object
required:
- spec
- wal_flush_lsn
properties:
safekeepers:
description: Primary replica safekeepers
type: string
spec:
$ref: "#/components/schemas/ComputeState"
wal_flush_lsn:
description: Primary last WAL flush LSN
type: string
description: "last WAL flush LSN"
example: "0/028F10D8"
LfcPrewarmState:
type: object

View File

@@ -0,0 +1,34 @@
use crate::pg_isready::pg_isready;
use crate::{compute::ComputeNode, http::JsonResponse};
use axum::{extract::State, http::StatusCode, response::Response};
use std::sync::Arc;
/// NOTE: NOT ENABLED YET
/// Detect if the compute is alive.
/// Called by the liveness probe of the compute container.
pub(in crate::http) async fn hadron_liveness_probe(
State(compute): State<Arc<ComputeNode>>,
) -> Response {
let port = match compute.params.connstr.port() {
Some(port) => port,
None => {
return JsonResponse::error(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to get the port from the connection string",
);
}
};
match pg_isready(&compute.params.pg_isready_bin, port) {
Ok(_) => {
// The connection is successful, so the compute is alive.
// Return a 200 OK response.
JsonResponse::success(StatusCode::OK, "ok")
}
Err(e) => {
tracing::error!("Hadron liveness probe failed: {}", e);
// The connection failed, so the compute is not alive.
// Return a 500 Internal Server Error response.
JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e)
}
}
}

View File

@@ -10,11 +10,13 @@ pub(in crate::http) mod extension_server;
pub(in crate::http) mod extensions;
pub(in crate::http) mod failpoints;
pub(in crate::http) mod grants;
pub(in crate::http) mod hadron_liveness_probe;
pub(in crate::http) mod insights;
pub(in crate::http) mod lfc;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod promote;
pub(in crate::http) mod refresh_configuration;
pub(in crate::http) mod status;
pub(in crate::http) mod terminate;

View File

@@ -1,14 +1,14 @@
use crate::http::JsonResponse;
use axum::Form;
use axum::extract::Json;
use http::StatusCode;
pub(in crate::http) async fn promote(
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
Form(safekeepers_lsn): Form<compute_api::responses::SafekeepersLsn>,
Json(cfg): Json<compute_api::responses::PromoteConfig>,
) -> axum::response::Response {
let state = compute.promote(safekeepers_lsn).await;
if let compute_api::responses::PromoteState::Failed { error } = state {
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, error);
let state = compute.promote(cfg).await;
if let compute_api::responses::PromoteState::Failed { error: _ } = state {
return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state);
}
JsonResponse::success(StatusCode::OK, state)
}

View File

@@ -0,0 +1,34 @@
// This file is added by Hadron
use std::sync::Arc;
use axum::{
extract::State,
response::{IntoResponse, Response},
};
use http::StatusCode;
use tracing::debug;
use crate::compute::ComputeNode;
// use crate::hadron_metrics::POSTGRES_PAGESTREAM_REQUEST_ERRORS;
use crate::http::JsonResponse;
// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec
// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait
// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause
// configuration to be reloaded in a best effort manner. Invocation of this method does not
// guarantee that a reconfiguration will occur. The caller should consider keep sending this
// request while it believes that the compute configuration is out of date.
pub(in crate::http) async fn refresh_configuration(
State(compute): State<Arc<ComputeNode>>,
) -> Response {
debug!("serving /refresh_configuration POST request");
// POSTGRES_PAGESTREAM_REQUEST_ERRORS.inc();
match compute.signal_refresh_configuration().await {
Ok(_) => StatusCode::OK.into_response(),
Err(e) => {
tracing::error!("error handling /refresh_configuration request: {}", e);
JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e)
}
}
}

View File

@@ -27,6 +27,7 @@ use super::{
},
};
use crate::compute::ComputeNode;
use crate::http::routes::{hadron_liveness_probe, refresh_configuration};
/// `compute_ctl` has two servers: internal and external. The internal server
/// binds to the loopback interface and handles communication from clients on
@@ -43,6 +44,7 @@ pub enum Server {
port: u16,
config: ComputeCtlConfig,
compute_id: String,
instance_id: Option<String>,
},
}
@@ -67,7 +69,12 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
post(extension_server::download_extension),
)
.route("/extensions", post(extensions::install_extension))
.route("/grants", post(grants::add_grant));
.route("/grants", post(grants::add_grant))
// Hadron: Compute-initiated configuration refresh
.route(
"/refresh_configuration",
post(refresh_configuration::refresh_configuration),
);
// Add in any testing support
if cfg!(feature = "testing") {
@@ -79,7 +86,10 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
router
}
Server::External {
config, compute_id, ..
config,
compute_id,
instance_id,
..
} => {
let unauthenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/metrics", get(metrics::get_metrics))
@@ -100,8 +110,13 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
.route("/metrics.json", get(metrics_json::get_metrics))
.route("/status", get(status::get_status))
.route("/terminate", post(terminate::terminate))
.route(
"/hadron_liveness_probe",
get(hadron_liveness_probe::hadron_liveness_probe),
)
.layer(AsyncRequireAuthorizationLayer::new(Authorize::new(
compute_id.clone(),
instance_id.clone(),
config.jwks.clone(),
)));

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use anyhow::Result;
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use once_cell::sync::Lazy;
use tokio_postgres::error::Error as PostgresError;
use tokio_postgres::{Client, Config, NoTls};
@@ -119,3 +120,7 @@ pub async fn get_installed_extensions(
extensions: extensions_map.into_values().collect(),
})
}
pub fn initialize_metrics() {
Lazy::force(&INSTALLED_EXTENSIONS);
}

View File

@@ -16,6 +16,7 @@ pub mod compute_prewarm;
pub mod compute_promote;
pub mod disk_quota;
pub mod extension_server;
pub mod hadron_metrics;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;
@@ -24,6 +25,7 @@ mod migration;
pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod pg_isready;
pub mod pgbouncer;
pub mod rsyslog;
pub mod spec;

View File

@@ -1,7 +1,10 @@
use std::collections::HashMap;
use std::sync::{LazyLock, RwLock};
use tracing::Subscriber;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use tracing_appender;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, layer::SubscriberExt, registry::LookupSpan};
/// Initialize logging to stderr, and OpenTelemetry tracing and exporter.
///
@@ -15,16 +18,44 @@ use tracing_subscriber::prelude::*;
///
pub fn init_tracing_and_logging(
default_log_level: &str,
) -> anyhow::Result<Option<tracing_utils::Provider>> {
log_dir_opt: &Option<String>,
) -> anyhow::Result<(
Option<tracing_utils::Provider>,
Option<tracing_appender::non_blocking::WorkerGuard>,
)> {
// Initialize Logging
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));
// Standard output streams
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.with_writer(std::io::stderr);
// Logs with file rotation. Files in `$log_dir/pgcctl.yyyy-MM-dd`
let (json_to_file_layer, _file_logs_guard) = if let Some(log_dir) = log_dir_opt {
std::fs::create_dir_all(log_dir)?;
let file_logs_appender = tracing_appender::rolling::RollingFileAppender::builder()
.rotation(tracing_appender::rolling::Rotation::DAILY)
.filename_prefix("pgcctl")
// Lib appends to existing files, so we will keep files for up to 2 days even on restart loops.
// At minimum, log-daemon will have 1 day to detect and upload a file (if created right before midnight).
.max_log_files(2)
.build(log_dir)
.expect("Initializing rolling file appender should succeed");
let (file_logs_writer, _file_logs_guard) =
tracing_appender::non_blocking(file_logs_appender);
let json_to_file_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.event_format(PgJsonLogShapeFormatter)
.with_writer(file_logs_writer);
(Some(json_to_file_layer), Some(_file_logs_guard))
} else {
(None, None)
};
// Initialize OpenTelemetry
let provider =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default());
@@ -35,12 +66,13 @@ pub fn init_tracing_and_logging(
.with(env_filter)
.with(otlp_layer)
.with(fmt_layer)
.with(json_to_file_layer)
.init();
tracing::info!("logging and tracing started");
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
Ok(provider)
Ok((provider, _file_logs_guard))
}
/// Replace all newline characters with a special character to make it
@@ -95,3 +127,157 @@ pub fn startup_context_from_env() -> Option<opentelemetry::Context> {
None
}
}
/// Track relevant id's
const UNKNOWN_IDS: &str = r#""pg_instance_id": "", "pg_compute_id": """#;
static IDS: LazyLock<RwLock<String>> = LazyLock::new(|| RwLock::new(UNKNOWN_IDS.to_string()));
pub fn update_ids(instance_id: &Option<String>, compute_id: &Option<String>) -> anyhow::Result<()> {
let ids = format!(
r#""pg_instance_id": "{}", "pg_compute_id": "{}""#,
instance_id.as_ref().map(|s| s.as_str()).unwrap_or_default(),
compute_id.as_ref().map(|s| s.as_str()).unwrap_or_default()
);
let mut guard = IDS
.write()
.map_err(|e| anyhow::anyhow!("Log set id's rwlock poisoned: {}", e))?;
*guard = ids;
Ok(())
}
/// Massage compute_ctl logs into PG json log shape so we can use the same Lumberjack setup.
struct PgJsonLogShapeFormatter;
impl<S, N> fmt::format::FormatEvent<S, N> for PgJsonLogShapeFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> fmt::format::FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &fmt::FmtContext<'_, S, N>,
mut writer: fmt::format::Writer<'_>,
event: &tracing::Event<'_>,
) -> std::fmt::Result {
// Format values from the event's metadata, and open message string
let metadata = event.metadata();
{
let ids_guard = IDS.read();
let ids = ids_guard
.as_ref()
.map(|guard| guard.as_str())
// Surpress so that we don't lose all uploaded/ file logs if something goes super wrong. We would notice the missing id's.
.unwrap_or(UNKNOWN_IDS);
write!(
&mut writer,
r#"{{"timestamp": "{}", "error_severity": "{}", "file_name": "{}", "backend_type": "compute_ctl_self", {}, "message": "#,
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f GMT"),
metadata.level(),
metadata.target(),
ids
)?;
}
let mut message = String::new();
let message_writer = fmt::format::Writer::new(&mut message);
// Gather the message
ctx.field_format().format_fields(message_writer, event)?;
// TODO: any better options than to copy-paste this OSS span formatter?
// impl<S, N, T> FormatEvent<S, N> for Format<Full, T>
// https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/trait.FormatEvent.html#impl-FormatEvent%3CS,+N%3E-for-Format%3CFull,+T%3E
// write message, close bracket, and new line
writeln!(writer, "{}}}", serde_json::to_string(&message).unwrap())
}
}
#[cfg(feature = "testing")]
#[cfg(test)]
mod test {
use super::*;
use std::{cell::RefCell, io};
// Use thread_local! instead of Mutex for test isolation
thread_local! {
static WRITER_OUTPUT: RefCell<String> = const { RefCell::new(String::new()) };
}
#[derive(Clone, Default)]
struct StaticStringWriter;
impl io::Write for StaticStringWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let output = String::from_utf8(buf.to_vec()).expect("Invalid UTF-8 in test output");
WRITER_OUTPUT.with(|s| s.borrow_mut().push_str(&output));
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl fmt::MakeWriter<'_> for StaticStringWriter {
type Writer = Self;
fn make_writer(&self) -> Self::Writer {
Self
}
}
#[test]
fn test_log_pg_json_shape_formatter() {
// Use a scoped subscriber to prevent global state pollution
let subscriber = tracing_subscriber::registry().with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.event_format(PgJsonLogShapeFormatter)
.with_writer(StaticStringWriter),
);
let _ = update_ids(&Some("000".to_string()), &Some("111".to_string()));
// Clear any previous test state
WRITER_OUTPUT.with(|s| s.borrow_mut().clear());
let messages = [
"test message",
r#"json escape check: name="BatchSpanProcessor.Flush.ExportError" reason="Other(reqwest::Error { kind: Request, url: \"http://localhost:4318/v1/traces\", source: hyper_
util::client::legacy::Error(Connect, ConnectError(\"tcp connect error\", Os { code: 111, kind: ConnectionRefused, message: \"Connection refused\" })) })" Failed during the export process"#,
];
tracing::subscriber::with_default(subscriber, || {
for message in messages {
tracing::info!(message);
}
});
tracing::info!("not test message");
// Get captured output
let output = WRITER_OUTPUT.with(|s| s.borrow().clone());
let json_strings: Vec<&str> = output.lines().collect();
assert_eq!(
json_strings.len(),
messages.len(),
"Log didn't have the expected number of json strings."
);
let json_string_shape_regex = regex::Regex::new(
r#"\{"timestamp": "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} GMT", "error_severity": "INFO", "file_name": ".+", "backend_type": "compute_ctl_self", "pg_instance_id": "000", "pg_compute_id": "111", "message": ".+"\}"#
).unwrap();
for (i, expected_message) in messages.iter().enumerate() {
let json_string = json_strings[i];
assert!(
json_string_shape_regex.is_match(json_string),
"Json log didn't match expected pattern:\n{json_string}",
);
let parsed_json: serde_json::Value = serde_json::from_str(json_string).unwrap();
let actual_message = parsed_json["message"].as_str().unwrap();
assert_eq!(*expected_message, actual_message);
}
}
}

View File

@@ -0,0 +1,30 @@
use anyhow::{Context, anyhow};
// Run `/usr/local/bin/pg_isready -p {port}`
// Check the connectivity of PG
// Success means PG is listening on the port and accepting connections
// Note that PG does not need to authenticate the connection, nor reserve a connection quota for it.
// See https://www.postgresql.org/docs/current/app-pg-isready.html
pub fn pg_isready(bin: &str, port: u16) -> anyhow::Result<()> {
let child_result = std::process::Command::new(bin)
.arg("-p")
.arg(port.to_string())
.spawn();
child_result
.context("spawn() failed")
.and_then(|mut child| child.wait().context("wait() failed"))
.and_then(|status| match status.success() {
true => Ok(()),
false => Err(anyhow!("process exited with {status}")),
})
// wrap any prior error with the overall context that we couldn't run the command
.with_context(|| format!("could not run `{bin} --port {port}`"))
}
// It's safe to assume pg_isready is under the same directory with postgres,
// because it is a PG util bin installed along with postgres
pub fn get_pg_isready_bin(pgbin: &str) -> String {
let split = pgbin.split("/").collect::<Vec<&str>>();
split[0..split.len() - 1].join("/") + "/pg_isready"
}

View File

@@ -1517,7 +1517,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
.ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
.ok_or_else(|| anyhow!("endpoint {endpoint_id} not found"))?;
if !args.allow_multiple {
cplane.check_conflicting_endpoints(

View File

@@ -97,8 +97,6 @@ pub struct EndpointConf {
reconfigure_concurrency: usize,
drop_subscriptions_before_start: bool,
features: Vec<ComputeFeature>,
compute_id: String,
instance_id: Option<String>,
cluster: Option<Cluster>,
compute_ctl_config: ComputeCtlConfig,
privileged_role_name: Option<String>,
@@ -201,8 +199,6 @@ impl ComputeControlPlane {
mode: ComputeMode,
grpc: bool,
skip_pg_catalog_updates: bool,
compute_id: &str,
instance_id: Option<String>,
drop_subscriptions_before_start: bool,
privileged_role_name: Option<String>,
) -> Result<Arc<Endpoint>> {
@@ -240,8 +236,6 @@ impl ComputeControlPlane {
grpc,
reconfigure_concurrency: 1,
features: vec![],
compute_id: compute_id.to_owned(),
instance_id: instance_id.clone(),
cluster: None,
compute_ctl_config: compute_ctl_config.clone(),
privileged_role_name: privileged_role_name.clone(),
@@ -264,8 +258,6 @@ impl ComputeControlPlane {
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
features: vec![],
compute_id: compute_id.to_string(),
instance_id: instance_id.clone(),
cluster: None,
compute_ctl_config,
privileged_role_name,
@@ -339,13 +331,6 @@ pub struct Endpoint {
reconfigure_concurrency: usize,
// Feature flags
features: Vec<ComputeFeature>,
// The compute_id is used to identify the compute node in the cloud.
compute_id: String,
// Hadron database instance id used for PG authentication and logs
instance_id: Option<String>,
// Cluster settings
cluster: Option<Cluster>,
@@ -410,7 +395,6 @@ pub struct EndpointStartArgs {
pub autoprewarm: bool,
pub offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
pub dev: bool,
pub pg_init_timeout: Option<Duration>,
}
impl Endpoint {
@@ -453,8 +437,6 @@ impl Endpoint {
reconfigure_concurrency: conf.reconfigure_concurrency,
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
features: conf.features,
compute_id: conf.compute_id,
instance_id: conf.instance_id,
cluster: conf.cluster,
compute_ctl_config: conf.compute_ctl_config,
privileged_role_name: conf.privileged_role_name,
@@ -499,7 +481,7 @@ impl Endpoint {
conf.append("restart_after_crash", "off");
// Load the 'neon' extension
conf.append("shared_preload_libraries", "neon, databricks_auth");
conf.append("shared_preload_libraries", "neon");
conf.append_line("");
// Replication-related configurations, such as WAL sending
@@ -803,7 +785,6 @@ impl Endpoint {
shard_stripe_size: Some(args.shard_stripe_size),
local_proxy_config: None,
reconfigure_concurrency: self.reconfigure_concurrency,
databricks_settings: None,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
audit_log_level: ComputeAudit::Disabled,
logs_export_host: None::<String>,

View File

@@ -108,11 +108,10 @@ pub enum PromoteState {
Failed { error: String },
}
#[derive(Deserialize, Serialize, Default, Debug, Clone)]
#[derive(Deserialize, Default, Debug)]
#[serde(rename_all = "snake_case")]
/// Result of /safekeepers_lsn
pub struct SafekeepersLsn {
pub safekeepers: String,
pub struct PromoteConfig {
pub spec: ComputeSpec,
pub wal_flush_lsn: utils::lsn::Lsn,
}
@@ -173,11 +172,6 @@ pub enum ComputeStatus {
TerminationPendingImmediate,
// Terminated Postgres
Terminated,
// A spec refresh is being requested
RefreshConfigurationPending,
// A spec refresh is being applied. We cannot refresh configuration again until the current
// refresh is done, i.e., signal_refresh_configuration() will return 500 error.
RefreshConfiguration,
}
#[derive(Deserialize, Serialize)]
@@ -190,10 +184,6 @@ impl Display for ComputeStatus {
match self {
ComputeStatus::Empty => f.write_str("empty"),
ComputeStatus::ConfigurationPending => f.write_str("configuration-pending"),
ComputeStatus::RefreshConfigurationPending => {
f.write_str("refresh-configuration-pending")
}
ComputeStatus::RefreshConfiguration => f.write_str("refresh-configuration"),
ComputeStatus::Init => f.write_str("init"),
ComputeStatus::Running => f.write_str("running"),
ComputeStatus::Configuration => f.write_str("configuration"),
@@ -294,15 +284,10 @@ pub struct TlsConfig {
}
/// Response of the `/computes/{compute_id}/spec` control-plane API.
/// This is not actually a compute API response, so consider moving
/// to a different place.
#[derive(Deserialize, Debug)]
pub struct ControlPlaneConfigResponse {
pub spec: Option<ComputeSpec>,
pub status: ControlPlaneComputeStatus,
// Hadron: Deserialize this field into a harmless default if
// compute_ctl_config is not present for compatibility.
#[serde(default)]
pub compute_ctl_config: ComputeCtlConfig,
}

View File

@@ -1500,6 +1500,7 @@ pub struct TimelineArchivalConfigRequest {
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct TimelinePatchIndexPartRequest {
pub rel_size_migration: Option<RelSizeMigration>,
pub rel_size_migrated_at: Option<Lsn>,
pub gc_compaction_last_completed_lsn: Option<Lsn>,
pub applied_gc_cutoff_lsn: Option<Lsn>,
#[serde(default)]
@@ -1533,10 +1534,10 @@ pub enum RelSizeMigration {
/// `None` is the same as `Some(RelSizeMigration::Legacy)`.
Legacy,
/// The tenant is migrating to the new rel_size format. Both old and new rel_size format are
/// persisted in the index part. The read path will read both formats and merge them.
/// persisted in the storage. The read path will read both formats and validate them.
Migrating,
/// The tenant has migrated to the new rel_size format. Only the new rel_size format is persisted
/// in the index part, and the read path will not read the old format.
/// in the storage, and the read path will not read the old format.
Migrated,
}
@@ -1619,6 +1620,7 @@ pub struct TimelineInfo {
/// The status of the rel_size migration.
pub rel_size_migration: Option<RelSizeMigration>,
pub rel_size_migrated_at: Option<Lsn>,
/// Whether the timeline is invisible in synthetic size calculations.
pub is_invisible: Option<bool>,

View File

@@ -292,8 +292,32 @@ impl Client {
simple_query::batch_execute(self.inner_mut(), query).await
}
pub async fn discard_all(&mut self) -> Result<ReadyForQueryStatus, Error> {
self.batch_execute("discard all").await
/// Similar to `discard_all`, but it does not clear any query plans
///
/// This runs in the background, so it can be executed without `await`ing.
pub fn reset_session_background(&mut self) -> Result<(), Error> {
// "CLOSE ALL": closes any cursors
// "SET SESSION AUTHORIZATION DEFAULT": resets the current_user back to the session_user
// "RESET ALL": resets any GUCs back to their session defaults.
// "DEALLOCATE ALL": deallocates any prepared statements
// "UNLISTEN *": stops listening on all channels
// "SELECT pg_advisory_unlock_all();": unlocks all advisory locks
// "DISCARD TEMP;": drops all temporary tables
// "DISCARD SEQUENCES;": deallocates all cached sequence state
let _responses = self.inner_mut().send_simple_query(
"ROLLBACK;
CLOSE ALL;
SET SESSION AUTHORIZATION DEFAULT;
RESET ALL;
DEALLOCATE ALL;
UNLISTEN *;
SELECT pg_advisory_unlock_all();
DISCARD TEMP;
DISCARD SEQUENCES;",
)?;
Ok(())
}
/// Begins a new database transaction.

View File

@@ -11,9 +11,8 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use crate::connect::connect;
use crate::connect_raw::{RawConnection, connect_raw};
use crate::connect_raw::{self, StartupStream};
use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::{MakeTlsConnect, TlsConnect, TlsStream};
use crate::{Client, Connection, Error};
@@ -244,24 +243,26 @@ impl Config {
&self,
stream: S,
tls: T,
) -> Result<RawConnection<S, T::Stream>, Error>
) -> Result<StartupStream<S, T::Stream>, Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>,
{
let stream = connect_tls(stream, self.ssl_mode, tls).await?;
connect_raw(stream, self).await
let mut stream = StartupStream::new(stream);
connect_raw::startup(&mut stream, self).await?;
connect_raw::authenticate(&mut stream, self).await?;
Ok(stream)
}
pub async fn authenticate<S, T>(
&self,
stream: MaybeTlsStream<S, T>,
) -> Result<RawConnection<S, T>, Error>
pub async fn authenticate<S, T>(&self, stream: &mut StartupStream<S, T>) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsStream + Unpin,
{
connect_raw(stream, self).await
connect_raw::startup(stream, self).await?;
connect_raw::authenticate(stream, self).await
}
}

View File

@@ -1,15 +1,17 @@
use std::net::IpAddr;
use futures_util::TryStreamExt;
use postgres_protocol2::message::backend::Message;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use crate::client::SocketConfig;
use crate::config::Host;
use crate::connect_raw::connect_raw;
use crate::connect_raw::StartupStream;
use crate::connect_socket::connect_socket;
use crate::connect_tls::connect_tls;
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Client, Config, Connection, Error, RawConnection};
use crate::{Client, Config, Connection, Error};
pub async fn connect<T>(
tls: &T,
@@ -43,14 +45,8 @@ where
T: TlsConnect<TcpStream>,
{
let socket = connect_socket(host_addr, host, port, config.connect_timeout).await?;
let stream = connect_tls(socket, config.ssl_mode, tls).await?;
let RawConnection {
stream,
parameters: _,
delayed_notice: _,
process_id,
secret_key,
} = connect_raw(stream, config).await?;
let mut stream = config.tls_and_authenticate(socket, tls).await?;
let (process_id, secret_key) = wait_until_ready(&mut stream).await?;
let socket_config = SocketConfig {
host_addr,
@@ -70,7 +66,32 @@ where
secret_key,
);
let stream = stream.into_framed();
let connection = Connection::new(stream, conn_tx, conn_rx);
Ok((client, connection))
}
async fn wait_until_ready<S, T>(stream: &mut StartupStream<S, T>) -> Result<(i32, i32), Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: AsyncRead + AsyncWrite + Unpin,
{
let mut process_id = 0;
let mut secret_key = 0;
loop {
match stream.try_next().await.map_err(Error::io)? {
Some(Message::BackendKeyData(body)) => {
process_id = body.process_id();
secret_key = body.secret_key();
}
// These values are currently not used by `Client`/`Connection`. Ignore them.
Some(Message::ParameterStatus(_)) | Some(Message::NoticeResponse(_)) => {}
Some(Message::ReadyForQuery(_)) => return Ok((process_id, secret_key)),
Some(Message::ErrorResponse(body)) => return Err(Error::db(body)),
Some(_) => return Err(Error::unexpected_message()),
None => return Err(Error::closed()),
}
}
}

View File

@@ -1,28 +1,26 @@
use std::collections::HashMap;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{Context, Poll, ready};
use bytes::{Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{Sink, SinkExt, Stream, TryStreamExt, ready};
use futures_util::{Sink, SinkExt, Stream, TryStreamExt};
use postgres_protocol2::authentication::sasl;
use postgres_protocol2::authentication::sasl::ScramSha256;
use postgres_protocol2::message::backend::{AuthenticationSaslBody, Message, NoticeResponseBody};
use postgres_protocol2::message::backend::{AuthenticationSaslBody, Message};
use postgres_protocol2::message::frontend;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_util::codec::{Framed, FramedParts, FramedWrite};
use crate::Error;
use crate::codec::{BackendMessage, BackendMessages, PostgresCodec};
use crate::codec::PostgresCodec;
use crate::config::{self, AuthKeys, Config};
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::TlsStream;
pub struct StartupStream<S, T> {
inner: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
buf: BackendMessages,
delayed_notice: Vec<NoticeResponseBody>,
inner: FramedWrite<MaybeTlsStream<S, T>, PostgresCodec>,
read_buf: BytesMut,
}
impl<S, T> Sink<Bytes> for StartupStream<S, T>
@@ -56,63 +54,93 @@ where
{
type Item = io::Result<Message>;
fn poll_next(
mut self: Pin<&mut Self>,
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// read 1 byte tag, 4 bytes length.
let header = ready!(self.as_mut().poll_fill_buf_exact(cx, 5)?);
let len = u32::from_be_bytes(header[1..5].try_into().unwrap());
if len < 4 {
return Poll::Ready(Some(Err(std::io::Error::other(
"postgres message too small",
))));
}
if len >= 65536 {
return Poll::Ready(Some(Err(std::io::Error::other(
"postgres message too large",
))));
}
// the tag is an additional byte.
let _message = ready!(self.as_mut().poll_fill_buf_exact(cx, len as usize + 1)?);
// Message::parse will remove the all the bytes from the buffer.
Poll::Ready(Message::parse(&mut self.read_buf).transpose())
}
}
impl<S, T> StartupStream<S, T>
where
S: AsyncRead + AsyncWrite + Unpin,
T: AsyncRead + AsyncWrite + Unpin,
{
/// Fill the buffer until it's the exact length provided. No additional data will be read from the socket.
///
/// If the current buffer length is greater, nothing happens.
fn poll_fill_buf_exact(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<io::Result<Message>>> {
loop {
match self.buf.next() {
Ok(Some(message)) => return Poll::Ready(Some(Ok(message))),
Ok(None) => {}
Err(e) => return Poll::Ready(Some(Err(e))),
len: usize,
) -> Poll<Result<&[u8], std::io::Error>> {
let this = self.get_mut();
let mut stream = Pin::new(this.inner.get_mut());
let mut n = this.read_buf.len();
while n < len {
this.read_buf.resize(len, 0);
let mut buf = ReadBuf::new(&mut this.read_buf[..]);
buf.set_filled(n);
if stream.as_mut().poll_read(cx, &mut buf)?.is_pending() {
this.read_buf.truncate(n);
return Poll::Pending;
}
match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
Some(Ok(BackendMessage::Normal { messages, .. })) => self.buf = messages,
Some(Ok(BackendMessage::Async(message))) => return Poll::Ready(Some(Ok(message))),
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None),
if buf.filled().len() == n {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"early eof",
)));
}
n = buf.filled().len();
this.read_buf.truncate(n);
}
Poll::Ready(Ok(&this.read_buf[..len]))
}
pub fn into_framed(mut self) -> Framed<MaybeTlsStream<S, T>, PostgresCodec> {
let write_buf = std::mem::take(self.inner.write_buffer_mut());
let io = self.inner.into_inner();
let mut parts = FramedParts::new(io, PostgresCodec);
parts.read_buf = self.read_buf;
parts.write_buf = write_buf;
Framed::from_parts(parts)
}
pub fn new(io: MaybeTlsStream<S, T>) -> Self {
Self {
inner: FramedWrite::new(io, PostgresCodec),
read_buf: BytesMut::new(),
}
}
}
pub struct RawConnection<S, T> {
pub stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
pub parameters: HashMap<String, String>,
pub delayed_notice: Vec<NoticeResponseBody>,
pub process_id: i32,
pub secret_key: i32,
}
pub async fn connect_raw<S, T>(
stream: MaybeTlsStream<S, T>,
pub(crate) async fn startup<S, T>(
stream: &mut StartupStream<S, T>,
config: &Config,
) -> Result<RawConnection<S, T>, Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsStream + Unpin,
{
let mut stream = StartupStream {
inner: Framed::new(stream, PostgresCodec),
buf: BackendMessages::empty(),
delayed_notice: Vec::new(),
};
startup(&mut stream, config).await?;
authenticate(&mut stream, config).await?;
let (process_id, secret_key, parameters) = read_info(&mut stream).await?;
Ok(RawConnection {
stream: stream.inner,
parameters,
delayed_notice: stream.delayed_notice,
process_id,
secret_key,
})
}
async fn startup<S, T>(stream: &mut StartupStream<S, T>, config: &Config) -> Result<(), Error>
) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: AsyncRead + AsyncWrite + Unpin,
@@ -123,7 +151,10 @@ where
stream.send(buf.freeze()).await.map_err(Error::io)
}
async fn authenticate<S, T>(stream: &mut StartupStream<S, T>, config: &Config) -> Result<(), Error>
pub(crate) async fn authenticate<S, T>(
stream: &mut StartupStream<S, T>,
config: &Config,
) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsStream + Unpin,
@@ -278,35 +309,3 @@ where
Ok(())
}
async fn read_info<S, T>(
stream: &mut StartupStream<S, T>,
) -> Result<(i32, i32, HashMap<String, String>), Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: AsyncRead + AsyncWrite + Unpin,
{
let mut process_id = 0;
let mut secret_key = 0;
let mut parameters = HashMap::new();
loop {
match stream.try_next().await.map_err(Error::io)? {
Some(Message::BackendKeyData(body)) => {
process_id = body.process_id();
secret_key = body.secret_key();
}
Some(Message::ParameterStatus(body)) => {
parameters.insert(
body.name().map_err(Error::parse)?.to_string(),
body.value().map_err(Error::parse)?.to_string(),
);
}
Some(Message::NoticeResponse(body)) => stream.delayed_notice.push(body),
Some(Message::ReadyForQuery(_)) => return Ok((process_id, secret_key, parameters)),
Some(Message::ErrorResponse(body)) => return Err(Error::db(body)),
Some(_) => return Err(Error::unexpected_message()),
None => return Err(Error::closed()),
}
}
}

View File

@@ -452,16 +452,16 @@ impl Error {
Error(Box::new(ErrorInner { kind, cause }))
}
pub(crate) fn closed() -> Error {
pub fn closed() -> Error {
Error::new(Kind::Closed, None)
}
pub(crate) fn unexpected_message() -> Error {
pub fn unexpected_message() -> Error {
Error::new(Kind::UnexpectedMessage, None)
}
#[allow(clippy::needless_pass_by_value)]
pub(crate) fn db(error: ErrorResponseBody) -> Error {
pub fn db(error: ErrorResponseBody) -> Error {
match DbError::parse(&mut error.fields()) {
Ok(e) => Error::new(Kind::Db, Some(Box::new(e))),
Err(e) => Error::new(Kind::Parse, Some(Box::new(e))),
@@ -493,7 +493,7 @@ impl Error {
Error::new(Kind::Tls, Some(e))
}
pub(crate) fn io(e: io::Error) -> Error {
pub fn io(e: io::Error) -> Error {
Error::new(Kind::Io, Some(Box::new(e)))
}

View File

@@ -6,7 +6,6 @@ use postgres_protocol2::message::backend::ReadyForQueryBody;
pub use crate::cancel_token::{CancelToken, RawCancelToken};
pub use crate::client::{Client, SocketConfig};
pub use crate::config::Config;
pub use crate::connect_raw::RawConnection;
pub use crate::connection::Connection;
pub use crate::error::Error;
pub use crate::generic_client::GenericClient;
@@ -50,7 +49,7 @@ mod client;
mod codec;
pub mod config;
mod connect;
mod connect_raw;
pub mod connect_raw;
mod connect_socket;
mod connect_tls;
mod connection;

View File

@@ -301,7 +301,12 @@ pub struct PullTimelineRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub http_hosts: Vec<String>,
pub ignore_tombstone: Option<bool>,
/// Membership configuration to switch to after pull.
/// It guarantees that if pull_timeline returns successfully, the timeline will
/// not be deleted by request with an older generation.
/// Storage controller always sets this field.
/// None is only allowed for manual pull_timeline requests.
pub mconf: Option<Configuration>,
}
#[derive(Debug, Serialize, Deserialize)]

View File

@@ -8,7 +8,7 @@ license.workspace = true
hyper0.workspace = true
opentelemetry = { workspace = true, features = ["trace"] }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
opentelemetry-otlp = { workspace = true, default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-otlp = { workspace = true, default-features = false, features = ["http-proto", "trace", "http", "reqwest-blocking-client"] }
opentelemetry-semantic-conventions.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing.workspace = true

View File

@@ -429,9 +429,11 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
};
let empty_wal_rate_limiter = crate::bindings::WalRateLimiter {
effective_max_wal_bytes_per_second: crate::bindings::pg_atomic_uint32 { value: 0 },
should_limit: crate::bindings::pg_atomic_uint32 { value: 0 },
sent_bytes: 0,
last_recorded_time_us: crate::bindings::pg_atomic_uint64 { value: 0 },
batch_start_time_us: crate::bindings::pg_atomic_uint64 { value: 0 },
batch_end_time_us: crate::bindings::pg_atomic_uint64 { value: 0 },
};
crate::bindings::WalproposerShmemState {

View File

@@ -484,6 +484,8 @@ async fn build_timeline_info_common(
*timeline.get_applied_gc_cutoff_lsn(),
);
let (rel_size_migration, rel_size_migrated_at) = timeline.get_rel_size_v2_status();
let info = TimelineInfo {
tenant_id: timeline.tenant_shard_id,
timeline_id: timeline.timeline_id,
@@ -515,7 +517,8 @@ async fn build_timeline_info_common(
state,
is_archived: Some(is_archived),
rel_size_migration: Some(timeline.get_rel_size_v2_status()),
rel_size_migration: Some(rel_size_migration),
rel_size_migrated_at,
is_invisible: Some(is_invisible),
walreceiver_status,
@@ -930,9 +933,16 @@ async fn timeline_patch_index_part_handler(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
if request_data.rel_size_migration.is_none() && request_data.rel_size_migrated_at.is_some()
{
return Err(ApiError::BadRequest(anyhow!(
"updating rel_size_migrated_at without rel_size_migration is not allowed"
)));
}
if let Some(rel_size_migration) = request_data.rel_size_migration {
timeline
.update_rel_size_v2_status(rel_size_migration)
.update_rel_size_v2_status(rel_size_migration, request_data.rel_size_migrated_at)
.map_err(ApiError::InternalServerError)?;
}

View File

@@ -57,7 +57,7 @@ pub async fn import_timeline_from_postgres_datadir(
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
// Then fishing out pg_control would be unnecessary
let mut modification = tline.begin_modification(pgdata_lsn);
let mut modification = tline.begin_modification_for_import(pgdata_lsn);
modification.init_empty()?;
// Import all but pg_wal
@@ -309,7 +309,7 @@ async fn import_wal(
waldecoder.feed_bytes(&buf);
let mut nrecords = 0;
let mut modification = tline.begin_modification(last_lsn);
let mut modification = tline.begin_modification_for_import(last_lsn);
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let interpreted = InterpretedWalRecord::from_bytes_filtered(
@@ -357,7 +357,7 @@ pub async fn import_basebackup_from_tar(
ctx: &RequestContext,
) -> Result<()> {
info!("importing base at {base_lsn}");
let mut modification = tline.begin_modification(base_lsn);
let mut modification = tline.begin_modification_for_import(base_lsn);
modification.init_empty()?;
let mut pg_control: Option<ControlFileData> = None;
@@ -457,7 +457,7 @@ pub async fn import_wal_from_tar(
waldecoder.feed_bytes(&bytes[offset..]);
let mut modification = tline.begin_modification(last_lsn);
let mut modification = tline.begin_modification_for_import(last_lsn);
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let interpreted = InterpretedWalRecord::from_bytes_filtered(

View File

@@ -6,7 +6,7 @@
//! walingest.rs handles a few things like implicit relation creation and extension.
//! Clarify that)
//!
use std::collections::{HashMap, HashSet, hash_map};
use std::collections::{BTreeSet, HashMap, HashSet, hash_map};
use std::ops::{ControlFlow, Range};
use std::sync::Arc;
@@ -227,6 +227,25 @@ impl Timeline {
pending_nblocks: 0,
pending_directory_entries: Vec::new(),
pending_metadata_bytes: 0,
is_importing_pgdata: false,
lsn,
}
}
pub fn begin_modification_for_import(&self, lsn: Lsn) -> DatadirModification
where
Self: Sized,
{
DatadirModification {
tline: self,
pending_lsns: Vec::new(),
pending_metadata_pages: HashMap::new(),
pending_data_batch: None,
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_directory_entries: Vec::new(),
pending_metadata_bytes: 0,
is_importing_pgdata: true,
lsn,
}
}
@@ -596,6 +615,50 @@ impl Timeline {
self.get_rel_exists_in_reldir(tag, version, None, ctx).await
}
async fn get_rel_exists_in_reldir_v1(
&self,
tag: RelTag,
version: Version<'_>,
deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
if let Some((cached_key, dir)) = deserialized_reldir_v1 {
if cached_key == key {
return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
} else if cfg!(test) || cfg!(feature = "testing") {
panic!("cached reldir key mismatch: {cached_key} != {key}");
} else {
warn!("cached reldir key mismatch: {cached_key} != {key}");
}
// Fallback to reading the directory from the datadir.
}
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
}
async fn get_rel_exists_in_reldir_v2(
&self,
tag: RelTag,
version: Version<'_>,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?).map_err(
|_| {
PageReconstructError::Other(anyhow::anyhow!(
"invalid reldir key: decode failed, {}",
key
))
},
)?;
let exists_v2 = buf == RelDirExists::Exists;
Ok(exists_v2)
}
/// Does the relation exist? With a cached deserialized `RelDirectory`.
///
/// There are some cases where the caller loops across all relations. In that specific case,
@@ -627,45 +690,134 @@ impl Timeline {
return Ok(false);
}
// Read path: first read the new reldir keyspace. Early return if the relation exists.
// Otherwise, read the old reldir keyspace.
// TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
self.get_rel_size_v2_status()
{
// fetch directory listing (new)
let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
.map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
let exists_v2 = buf == RelDirExists::Exists;
// Fast path: if the relation exists in the new format, return true.
// TODO: we should have a verification mode that checks both keyspaces
// to ensure the relation only exists in one of them.
if exists_v2 {
return Ok(true);
match v2_status {
RelSizeMigration::Legacy => {
let v1_exists = self
.get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
.await?;
Ok(v1_exists)
}
RelSizeMigration::Migrating | RelSizeMigration::Migrated
if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
{
// For requests below the migrated LSN, we still use the v1 read path.
let v1_exists = self
.get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
.await?;
Ok(v1_exists)
}
RelSizeMigration::Migrating => {
let v1_exists = self
.get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
.await?;
let v2_exists_res = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await;
match v2_exists_res {
Ok(v2_exists) if v1_exists == v2_exists => {}
Ok(v2_exists) => {
tracing::warn!(
"inconsistent v1/v2 reldir keyspace for rel {}: v1_exists={}, v2_exists={}",
tag,
v1_exists,
v2_exists
);
}
Err(e) => {
tracing::warn!("failed to get rel exists in v2: {e}");
}
}
Ok(v1_exists)
}
RelSizeMigration::Migrated => {
let v2_exists = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await?;
Ok(v2_exists)
}
}
}
// fetch directory listing (old)
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
if let Some((cached_key, dir)) = deserialized_reldir_v1 {
if cached_key == key {
return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
} else if cfg!(test) || cfg!(feature = "testing") {
panic!("cached reldir key mismatch: {cached_key} != {key}");
} else {
warn!("cached reldir key mismatch: {cached_key} != {key}");
}
// Fallback to reading the directory from the datadir.
}
async fn list_rels_v1(
&self,
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
ctx: &RequestContext,
) -> Result<HashSet<RelTag>, PageReconstructError> {
let key = rel_dir_to_key(spcnode, dbnode);
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
Ok(exists_v1)
let rels_v1: HashSet<RelTag> =
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
spcnode,
dbnode,
relnode: *relnode,
forknum: *forknum,
}));
Ok(rels_v1)
}
async fn list_rels_v2(
&self,
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
ctx: &RequestContext,
) -> Result<HashSet<RelTag>, PageReconstructError> {
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
);
let results = self
.scan(
KeySpace::single(key_range),
version.get_lsn(),
ctx,
io_concurrency,
)
.await?;
let mut rels = HashSet::new();
for (key, val) in results {
let val = RelDirExists::decode(&val?).map_err(|_| {
PageReconstructError::Other(anyhow::anyhow!(
"invalid reldir key: decode failed, {}",
key
))
})?;
if key.field6 != 1 {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid reldir key: field6 != 1, {}",
key
)));
}
if key.field2 != spcnode {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid reldir key: field2 != spcnode, {}",
key
)));
}
if key.field3 != dbnode {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid reldir key: field3 != dbnode, {}",
key
)));
}
let tag = RelTag {
spcnode,
dbnode,
relnode: key.field4,
forknum: key.field5,
};
if val == RelDirExists::Removed {
debug_assert!(!rels.contains(&tag), "removed reltag in v2");
continue;
}
let did_not_contain = rels.insert(tag);
debug_assert!(did_not_contain, "duplicate reltag in v2");
}
Ok(rels)
}
/// Get a list of all existing relations in given tablespace and database.
@@ -683,60 +835,45 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<HashSet<RelTag>, PageReconstructError> {
// fetch directory listing (old)
let key = rel_dir_to_key(spcnode, dbnode);
let buf = version.get(self, key, ctx).await?;
let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
let dir = RelDirectory::des(&buf)?;
let rels_v1: HashSet<RelTag> =
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
spcnode,
dbnode,
relnode: *relnode,
forknum: *forknum,
}));
if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
return Ok(rels_v1);
}
// scan directory listing (new), merge with the old results
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
);
let results = self
.scan(
KeySpace::single(key_range),
version.get_lsn(),
ctx,
io_concurrency,
)
.await?;
let mut rels = rels_v1;
for (key, val) in results {
let val = RelDirExists::decode(&val?)
.map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
assert_eq!(key.field6, 1);
assert_eq!(key.field2, spcnode);
assert_eq!(key.field3, dbnode);
let tag = RelTag {
spcnode,
dbnode,
relnode: key.field4,
forknum: key.field5,
};
if val == RelDirExists::Removed {
debug_assert!(!rels.contains(&tag), "removed reltag in v2");
continue;
match v2_status {
RelSizeMigration::Legacy => {
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
Ok(rels_v1)
}
RelSizeMigration::Migrating | RelSizeMigration::Migrated
if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
{
// For requests below the migrated LSN, we still use the v1 read path.
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
Ok(rels_v1)
}
RelSizeMigration::Migrating => {
let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
let rels_v2_res = self.list_rels_v2(spcnode, dbnode, version, ctx).await;
match rels_v2_res {
Ok(rels_v2) if rels_v1 == rels_v2 => {}
Ok(rels_v2) => {
tracing::warn!(
"inconsistent v1/v2 reldir keyspace for db {} {}: v1_rels.len()={}, v2_rels.len()={}",
spcnode,
dbnode,
rels_v1.len(),
rels_v2.len()
);
}
Err(e) => {
tracing::warn!("failed to list rels in v2: {e}");
}
}
Ok(rels_v1)
}
RelSizeMigration::Migrated => {
let rels_v2 = self.list_rels_v2(spcnode, dbnode, version, ctx).await?;
Ok(rels_v2)
}
let did_not_contain = rels.insert(tag);
debug_assert!(did_not_contain, "duplicate reltag in v2");
}
Ok(rels)
}
/// Get the whole SLRU segment
@@ -1258,10 +1395,10 @@ impl Timeline {
let mut dbdir_cnt = 0;
let mut rel_cnt = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for &(spcnode, dbnode) in dbdir.dbdirs.keys() {
dbdir_cnt += 1;
for rel in self
.list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
.list_rels(spcnode, dbnode, Version::at(lsn), ctx)
.await?
{
rel_cnt += 1;
@@ -1566,6 +1703,9 @@ pub struct DatadirModification<'a> {
/// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
pending_metadata_bytes: usize,
/// Whether we are importing a pgdata directory.
is_importing_pgdata: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -1578,6 +1718,14 @@ pub enum MetricsUpdate {
Sub(u64),
}
/// Controls the behavior of the reldir keyspace.
pub struct RelDirMode {
// Whether we can read the v2 keyspace or not.
current_status: RelSizeMigration,
// Whether we should initialize the v2 keyspace or not.
initialize: bool,
}
impl DatadirModification<'_> {
// When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
// contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
@@ -1933,30 +2081,49 @@ impl DatadirModification<'_> {
}
/// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
/// we enable it, we also need to persist it in `index_part.json`.
pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
let status = self.tline.get_rel_size_v2_status();
/// we enable it, we also need to persist it in `index_part.json` (initialize is true).
///
/// As this function is only used on the write path, we do not need to read the migrated_at
/// field.
pub fn maybe_enable_rel_size_v2(&mut self, is_create: bool) -> anyhow::Result<RelDirMode> {
// TODO: define the behavior of the tenant-level config flag and use feature flag to enable this feature
let (status, _) = self.tline.get_rel_size_v2_status();
let config = self.tline.get_rel_size_v2_enabled();
match (config, status) {
(false, RelSizeMigration::Legacy) => {
// tenant config didn't enable it and we didn't write any reldir_v2 key yet
Ok(false)
Ok(RelDirMode {
current_status: RelSizeMigration::Legacy,
initialize: false,
})
}
(false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
(false, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => {
// index_part already persisted that the timeline has enabled rel_size_v2
Ok(true)
Ok(RelDirMode {
current_status: status,
initialize: false,
})
}
(true, RelSizeMigration::Legacy) => {
// The first time we enable it, we need to persist it in `index_part.json`
self.tline
.update_rel_size_v2_status(RelSizeMigration::Migrating)?;
tracing::info!("enabled rel_size_v2");
Ok(true)
// The caller should update the reldir status once the initialization is done.
//
// Only initialize the v2 keyspace on new relation creation. No initialization
// during `timeline_create` (TODO: fix this, we should allow, but currently it
// hits consistency issues).
Ok(RelDirMode {
current_status: RelSizeMigration::Legacy,
initialize: is_create && !self.is_importing_pgdata,
})
}
(true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
(true, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => {
// index_part already persisted that the timeline has enabled rel_size_v2
// and we don't need to do anything
Ok(true)
Ok(RelDirMode {
current_status: status,
initialize: false,
})
}
}
}
@@ -1969,8 +2136,8 @@ impl DatadirModification<'_> {
img: Bytes,
ctx: &RequestContext,
) -> Result<(), WalIngestError> {
let v2_enabled = self
.maybe_enable_rel_size_v2()
let v2_mode = self
.maybe_enable_rel_size_v2(false)
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
// Add it to the directory (if it doesn't exist already)
@@ -1986,17 +2153,19 @@ impl DatadirModification<'_> {
self.put(DBDIR_KEY, Value::Image(buf.into()));
}
if r.is_none() {
// Create RelDirectory
// TODO: if we have fully migrated to v2, no need to create this directory
if v2_mode.current_status != RelSizeMigration::Legacy {
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
}
// Create RelDirectory in v1 keyspace. TODO: if we have fully migrated to v2, no need to create this directory.
// Some code path relies on this directory to be present. We should remove it once we starts to set tenants to
// `RelSizeMigration::Migrated` state (currently we don't, all tenants will have `RelSizeMigration::Migrating`).
let buf = RelDirectory::ser(&RelDirectory {
rels: HashSet::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
if v2_enabled {
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
}
self.put(
rel_dir_to_key(spcnode, dbnode),
Value::Image(Bytes::from(buf)),
@@ -2103,6 +2272,109 @@ impl DatadirModification<'_> {
Ok(())
}
async fn initialize_rel_size_v2_keyspace(
&mut self,
ctx: &RequestContext,
dbdir: &DbDirectory,
) -> Result<(), WalIngestError> {
// Copy everything from relv1 to relv2; TODO: check if there's any key in the v2 keyspace, if so, abort.
tracing::info!("initializing rel_size_v2 keyspace");
let mut rel_cnt = 0;
// relmap_exists (the value of dbdirs hashmap) does not affect the migration: we need to copy things over anyways
for &(spcnode, dbnode) in dbdir.dbdirs.keys() {
let rel_dir_key = rel_dir_to_key(spcnode, dbnode);
let rel_dir = RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?;
for (relnode, forknum) in rel_dir.rels {
let sparse_rel_dir_key = rel_tag_sparse_key(spcnode, dbnode, relnode, forknum);
self.put(
sparse_rel_dir_key,
Value::Image(RelDirExists::Exists.encode()),
);
tracing::info!(
"migrated rel_size_v2: {}",
RelTag {
spcnode,
dbnode,
relnode,
forknum
}
);
rel_cnt += 1;
}
}
tracing::info!(
"initialized rel_size_v2 keyspace at lsn {}: migrated {} relations",
self.lsn,
rel_cnt
);
self.tline
.update_rel_size_v2_status(RelSizeMigration::Migrating, Some(self.lsn))
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
Ok::<_, WalIngestError>(())
}
async fn put_rel_creation_v1(
&mut self,
rel: RelTag,
dbdir_exists: bool,
ctx: &RequestContext,
) -> Result<(), WalIngestError> {
// Reldir v1 write path
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir = if !dbdir_exists {
// Create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
};
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
if !dbdir_exists {
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
}
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
self.put(
rel_dir_key,
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
);
Ok(())
}
async fn put_rel_creation_v2(
&mut self,
rel: RelTag,
dbdir_exists: bool,
ctx: &RequestContext,
) -> Result<(), WalIngestError> {
// Reldir v2 write path
let sparse_rel_dir_key =
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
// check if the rel_dir_key exists in v2
let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
let val = RelDirExists::decode_option(val)
.map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
if val == RelDirExists::Exists {
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
self.put(
sparse_rel_dir_key,
Value::Image(RelDirExists::Exists.encode()),
);
if !dbdir_exists {
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
}
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
Ok(())
}
/// Create a relation fork.
///
/// 'nblocks' is the initial size.
@@ -2136,66 +2408,31 @@ impl DatadirModification<'_> {
true
};
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir = if !dbdir_exists {
// Create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
};
let v2_enabled = self
.maybe_enable_rel_size_v2()
let mut v2_mode = self
.maybe_enable_rel_size_v2(true)
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
if v2_enabled {
if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
if v2_mode.initialize {
if let Err(e) = self.initialize_rel_size_v2_keyspace(ctx, &dbdir).await {
tracing::warn!("error initializing rel_size_v2 keyspace: {}", e);
// TODO: circuit breaker so that it won't retry forever
} else {
v2_mode.current_status = RelSizeMigration::Migrating;
}
let sparse_rel_dir_key =
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
// check if the rel_dir_key exists in v2
let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
let val = RelDirExists::decode_option(val)
.map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
if val == RelDirExists::Exists {
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
if v2_mode.current_status != RelSizeMigration::Migrated {
self.put_rel_creation_v1(rel, dbdir_exists, ctx).await?;
}
if v2_mode.current_status != RelSizeMigration::Legacy {
let write_v2_res = self.put_rel_creation_v2(rel, dbdir_exists, ctx).await;
if let Err(e) = write_v2_res {
if v2_mode.current_status == RelSizeMigration::Migrated {
return Err(e);
}
tracing::warn!("error writing rel_size_v2 keyspace: {}", e);
}
self.put(
sparse_rel_dir_key,
Value::Image(RelDirExists::Exists.encode()),
);
if !dbdir_exists {
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
// We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
// TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
// will be key not found errors if we don't create an empty one for rel_size_v2.
self.put(
rel_dir_key,
Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
);
}
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
} else {
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
if !dbdir_exists {
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
}
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
self.put(
rel_dir_key,
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
);
}
// Put size
@@ -2270,15 +2507,12 @@ impl DatadirModification<'_> {
Ok(())
}
/// Drop some relations
pub(crate) async fn put_rel_drops(
async fn put_rel_drop_v1(
&mut self,
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
ctx: &RequestContext,
) -> Result<(), WalIngestError> {
let v2_enabled = self
.maybe_enable_rel_size_v2()
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
) -> Result<BTreeSet<RelTag>, WalIngestError> {
let mut dropped_rels = BTreeSet::new();
for ((spc_node, db_node), rel_tags) in drop_relations {
let dir_key = rel_dir_to_key(spc_node, db_node);
let buf = self.get(dir_key, ctx).await?;
@@ -2290,25 +2524,8 @@ impl DatadirModification<'_> {
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
dirty = true;
dropped_rels.insert(rel_tag);
true
} else if v2_enabled {
// The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
// Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
// logic).
let key =
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
.map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
if val == RelDirExists::Exists {
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
// put tombstone
self.put(key, Value::Image(RelDirExists::Removed.encode()));
// no need to set dirty to true
true
} else {
false
}
} else {
false
};
@@ -2331,7 +2548,67 @@ impl DatadirModification<'_> {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
}
}
Ok(dropped_rels)
}
async fn put_rel_drop_v2(
&mut self,
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
ctx: &RequestContext,
) -> Result<BTreeSet<RelTag>, WalIngestError> {
let mut dropped_rels = BTreeSet::new();
for ((spc_node, db_node), rel_tags) in drop_relations {
for rel_tag in rel_tags {
let key = rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
.map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
if val == RelDirExists::Exists {
dropped_rels.insert(rel_tag);
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
// put tombstone
self.put(key, Value::Image(RelDirExists::Removed.encode()));
}
}
}
Ok(dropped_rels)
}
/// Drop some relations
pub(crate) async fn put_rel_drops(
&mut self,
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
ctx: &RequestContext,
) -> Result<(), WalIngestError> {
let v2_mode = self
.maybe_enable_rel_size_v2(false)
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
match v2_mode.current_status {
RelSizeMigration::Legacy => {
self.put_rel_drop_v1(drop_relations, ctx).await?;
}
RelSizeMigration::Migrating => {
let dropped_rels_v1 = self.put_rel_drop_v1(drop_relations.clone(), ctx).await?;
let dropped_rels_v2_res = self.put_rel_drop_v2(drop_relations, ctx).await;
match dropped_rels_v2_res {
Ok(dropped_rels_v2) => {
if dropped_rels_v1 != dropped_rels_v2 {
tracing::warn!(
"inconsistent v1/v2 rel drop: dropped_rels_v1.len()={}, dropped_rels_v2.len()={}",
dropped_rels_v1.len(),
dropped_rels_v2.len()
);
}
}
Err(e) => {
tracing::warn!("error dropping rels: {}", e);
}
}
}
RelSizeMigration::Migrated => {
self.put_rel_drop_v2(drop_relations, ctx).await?;
}
}
Ok(())
}

View File

@@ -1205,6 +1205,7 @@ impl TenantShard {
idempotency.clone(),
index_part.gc_compaction.clone(),
index_part.rel_size_migration.clone(),
index_part.rel_size_migrated_at,
ctx,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
@@ -2584,6 +2585,7 @@ impl TenantShard {
initdb_lsn,
None,
None,
None,
ctx,
)
.await
@@ -2913,6 +2915,7 @@ impl TenantShard {
initdb_lsn,
None,
None,
None,
ctx,
)
.await
@@ -4342,6 +4345,7 @@ impl TenantShard {
create_idempotency: CreateTimelineIdempotency,
gc_compaction_state: Option<GcCompactionState>,
rel_size_v2_status: Option<RelSizeMigration>,
rel_size_migrated_at: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<(Arc<Timeline>, RequestContext)> {
let state = match cause {
@@ -4376,6 +4380,7 @@ impl TenantShard {
create_idempotency,
gc_compaction_state,
rel_size_v2_status,
rel_size_migrated_at,
self.cancel.child_token(),
);
@@ -5085,6 +5090,7 @@ impl TenantShard {
src_timeline.pg_version,
);
let (rel_size_v2_status, rel_size_migrated_at) = src_timeline.get_rel_size_v2_status();
let (uninitialized_timeline, _timeline_ctx) = self
.prepare_new_timeline(
dst_id,
@@ -5092,7 +5098,8 @@ impl TenantShard {
timeline_create_guard,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
Some(src_timeline.get_rel_size_v2_status()),
Some(rel_size_v2_status),
rel_size_migrated_at,
ctx,
)
.await?;
@@ -5379,6 +5386,7 @@ impl TenantShard {
pgdata_lsn,
None,
None,
None,
ctx,
)
.await?;
@@ -5462,14 +5470,17 @@ impl TenantShard {
start_lsn: Lsn,
ancestor: Option<Arc<Timeline>>,
rel_size_v2_status: Option<RelSizeMigration>,
rel_size_migrated_at: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<(UninitializedTimeline<'a>, RequestContext)> {
let tenant_shard_id = self.tenant_shard_id;
let resources = self.build_timeline_resources(new_timeline_id);
resources
.remote_client
.init_upload_queue_for_empty_remote(new_metadata, rel_size_v2_status.clone())?;
resources.remote_client.init_upload_queue_for_empty_remote(
new_metadata,
rel_size_v2_status.clone(),
rel_size_migrated_at,
)?;
let (timeline_struct, timeline_ctx) = self
.create_timeline_struct(
@@ -5482,6 +5493,7 @@ impl TenantShard {
create_guard.idempotency.clone(),
None,
rel_size_v2_status,
rel_size_migrated_at,
ctx,
)
.context("Failed to create timeline data structure")?;

View File

@@ -443,7 +443,8 @@ impl RemoteTimelineClient {
pub fn init_upload_queue_for_empty_remote(
&self,
local_metadata: &TimelineMetadata,
rel_size_v2_status: Option<RelSizeMigration>,
rel_size_v2_migration: Option<RelSizeMigration>,
rel_size_migrated_at: Option<Lsn>,
) -> anyhow::Result<()> {
// Set the maximum number of inprogress tasks to the remote storage concurrency. There's
// certainly no point in starting more upload tasks than this.
@@ -455,7 +456,8 @@ impl RemoteTimelineClient {
let mut upload_queue = self.upload_queue.lock().unwrap();
let initialized_queue =
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
initialized_queue.dirty.rel_size_migration = rel_size_v2_status;
initialized_queue.dirty.rel_size_migration = rel_size_v2_migration;
initialized_queue.dirty.rel_size_migrated_at = rel_size_migrated_at;
self.update_remote_physical_size_gauge(None);
info!("initialized upload queue as empty");
Ok(())
@@ -994,10 +996,12 @@ impl RemoteTimelineClient {
pub(crate) fn schedule_index_upload_for_rel_size_v2_status_update(
self: &Arc<Self>,
rel_size_v2_status: RelSizeMigration,
rel_size_migrated_at: Option<Lsn>,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue.dirty.rel_size_migration = Some(rel_size_v2_status);
upload_queue.dirty.rel_size_migrated_at = rel_size_migrated_at;
// TODO: allow this operation to bypass the validation check because we might upload the index part
// with no layers but the flag updated. For now, we just modify the index part in memory and the next
// upload will include the flag.

View File

@@ -114,6 +114,11 @@ pub struct IndexPart {
/// The timestamp when the timeline was marked invisible in synthetic size calculations.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) marked_invisible_at: Option<NaiveDateTime>,
/// The LSN at which we started the rel size migration. Accesses below this LSN should be
/// processed with the v1 read path. Usually this LSN should be set together with `rel_size_migration`.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) rel_size_migrated_at: Option<Lsn>,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
@@ -142,10 +147,12 @@ impl IndexPart {
/// - 12: +l2_lsn
/// - 13: +gc_compaction
/// - 14: +marked_invisible_at
const LATEST_VERSION: usize = 14;
/// - 15: +rel_size_migrated_at
const LATEST_VERSION: usize = 15;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];
pub const KNOWN_VERSIONS: &'static [usize] =
&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -165,6 +172,7 @@ impl IndexPart {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
}
}
@@ -475,6 +483,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -524,6 +533,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -574,6 +584,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -627,6 +638,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
@@ -675,6 +687,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -726,6 +739,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -782,6 +796,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -843,6 +858,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -905,6 +921,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -972,6 +989,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1052,6 +1070,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1133,6 +1152,7 @@ mod tests {
l2_lsn: None,
gc_compaction: None,
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1220,6 +1240,7 @@ mod tests {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: None,
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1308,6 +1329,97 @@ mod tests {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
rel_size_migrated_at: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
#[test]
fn v15_rel_size_migrated_at_is_parsed() {
let example = r#"{
"version": 15,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
},
"disk_consistent_lsn":"0/16960E8",
"metadata": {
"disk_consistent_lsn": "0/16960E8",
"prev_record_lsn": "0/1696070",
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
"ancestor_lsn": "0/0",
"latest_gc_cutoff_lsn": "0/1696070",
"initdb_lsn": "0/1696070",
"pg_version": 14
},
"gc_blocking": {
"started_at": "2024-07-19T09:00:00.123",
"reasons": ["DetachAncestor"]
},
"import_pgdata": {
"V1": {
"Done": {
"idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5",
"started_at": "2024-11-13T09:23:42.123",
"finished_at": "2024-11-13T09:42:23.123"
}
}
},
"rel_size_migration": "legacy",
"l2_lsn": "0/16960E8",
"gc_compaction": {
"last_completed_lsn": "0/16960E8"
},
"marked_invisible_at": "2023-07-31T09:00:00.123",
"rel_size_migrated_at": "0/16960E8"
}"#;
let expected = IndexPart {
version: 15,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::new(
Lsn::from_str("0/16960E8").unwrap(),
Some(Lsn::from_str("0/1696070").unwrap()),
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
PgMajorVersion::PG14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
gc_blocking: Some(GcBlocking {
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
}),
last_aux_file_policy: Default::default(),
archived_at: None,
import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{
started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"),
finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"),
idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()),
}))),
rel_size_migration: Some(RelSizeMigration::Legacy),
l2_lsn: Some("0/16960E8".parse::<Lsn>().unwrap()),
gc_compaction: Some(GcCompactionState {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
marked_invisible_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
rel_size_migrated_at: Some("0/16960E8".parse::<Lsn>().unwrap()),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();

View File

@@ -441,7 +441,7 @@ pub struct Timeline {
/// heatmap on demand.
heatmap_layers_downloader: Mutex<Option<heatmap_layers_downloader::HeatmapLayersDownloader>>,
pub(crate) rel_size_v2_status: ArcSwapOption<RelSizeMigration>,
pub(crate) rel_size_v2_status: ArcSwap<(Option<RelSizeMigration>, Option<Lsn>)>,
wait_lsn_log_slow: tokio::sync::Semaphore,
@@ -2894,12 +2894,9 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled)
}
pub(crate) fn get_rel_size_v2_status(&self) -> RelSizeMigration {
self.rel_size_v2_status
.load()
.as_ref()
.map(|s| s.as_ref().clone())
.unwrap_or(RelSizeMigration::Legacy)
pub(crate) fn get_rel_size_v2_status(&self) -> (RelSizeMigration, Option<Lsn>) {
let (status, migrated_at) = self.rel_size_v2_status.load().as_ref().clone();
(status.unwrap_or(RelSizeMigration::Legacy), migrated_at)
}
fn get_compaction_upper_limit(&self) -> usize {
@@ -3174,6 +3171,7 @@ impl Timeline {
create_idempotency: crate::tenant::CreateTimelineIdempotency,
gc_compaction_state: Option<GcCompactionState>,
rel_size_v2_status: Option<RelSizeMigration>,
rel_size_migrated_at: Option<Lsn>,
cancel: CancellationToken,
) -> Arc<Self> {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
@@ -3338,7 +3336,10 @@ impl Timeline {
heatmap_layers_downloader: Mutex::new(None),
rel_size_v2_status: ArcSwapOption::from_pointee(rel_size_v2_status),
rel_size_v2_status: ArcSwap::from_pointee((
rel_size_v2_status,
rel_size_migrated_at,
)),
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
@@ -3426,11 +3427,17 @@ impl Timeline {
pub(crate) fn update_rel_size_v2_status(
&self,
rel_size_v2_status: RelSizeMigration,
rel_size_migrated_at: Option<Lsn>,
) -> anyhow::Result<()> {
self.rel_size_v2_status
.store(Some(Arc::new(rel_size_v2_status.clone())));
self.rel_size_v2_status.store(Arc::new((
Some(rel_size_v2_status.clone()),
rel_size_migrated_at,
)));
self.remote_client
.schedule_index_upload_for_rel_size_v2_status_update(rel_size_v2_status)
.schedule_index_upload_for_rel_size_v2_status_update(
rel_size_v2_status,
rel_size_migrated_at,
)
}
pub(crate) fn get_gc_compaction_state(&self) -> Option<GcCompactionState> {

View File

@@ -332,6 +332,7 @@ impl DeleteTimelineFlow {
crate::tenant::CreateTimelineIdempotency::FailWithConflict, // doesn't matter what we put here
None, // doesn't matter what we put here
None, // doesn't matter what we put here
None, // doesn't matter what we put here
ctx,
)
.context("create_timeline_struct")?;

View File

@@ -178,6 +178,8 @@ static PageServer page_servers[MAX_SHARDS];
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static void pageserver_disconnect_shard(shardno_t shard_no);
// HADRON
shardno_t get_num_shards(void);
static bool
PagestoreShmemIsValid(void)
@@ -286,6 +288,22 @@ AssignPageserverConnstring(const char *newval, void *extra)
}
}
/* BEGIN_HADRON */
/**
* Return the total number of shards seen in the shard map.
*/
shardno_t get_num_shards(void)
{
const ShardMap *shard_map;
Assert(pagestore_shared);
shard_map = &pagestore_shared->shard_map;
Assert(shard_map != NULL);
return shard_map->num_shards;
}
/* END_HADRON */
/*
* Get the current number of shards, and/or the connection string for a
* particular shard from the shard map in shared memory.

View File

@@ -48,6 +48,7 @@
PG_MODULE_MAGIC;
void _PG_init(void);
bool lakebase_mode = false;
static int running_xacts_overflow_policy;
static bool monitor_query_exec_time = false;
@@ -583,6 +584,16 @@ _PG_init(void)
"neon_superuser",
PGC_POSTMASTER, 0, NULL, NULL, NULL);
DefineCustomBoolVariable(
"neon.lakebase_mode",
"Is neon running in Lakebase?",
NULL,
&lakebase_mode,
false,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
/*
* Important: This must happen after other parts of the extension are
* loaded, otherwise any settings to GUCs that were set before the

View File

@@ -21,6 +21,7 @@ extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern int readahead_getpage_pull_timeout_ms;
extern bool disable_wal_prev_lsn_checks;
extern bool lakebase_mode;
extern bool AmPrewarmWorker;

View File

@@ -389,12 +389,21 @@ typedef struct PageserverFeedback
*/
typedef struct WalRateLimiter
{
/* If the value is 1, PG backends will hit backpressure. */
/* The effective wal write rate. Could be changed dynamically
based on whether PG has backpressure or not.*/
pg_atomic_uint32 effective_max_wal_bytes_per_second;
/* If the value is 1, PG backends will hit backpressure until the time has past batch_end_time_us. */
pg_atomic_uint32 should_limit;
/* The number of bytes sent in the current second. */
uint64 sent_bytes;
/* The last recorded time in microsecond. */
pg_atomic_uint64 last_recorded_time_us;
/* The timestamp when the write starts in the current batch. A batch is a time interval (e.g., )that we
track and throttle writes. Most times a batch is 1s, but it could become larger if the PG overwrites the WALs
and we will adjust the batch accordingly to compensate (e.g., if PG writes 10MB at once and max WAL write rate
is 1MB/s, then the current batch will become 10s). */
pg_atomic_uint64 batch_start_time_us;
/* The timestamp (in the future) that the current batch should end and accept more writes
(after should_limit is set to 1). */
pg_atomic_uint64 batch_end_time_us;
} WalRateLimiter;
/* END_HADRON */

View File

@@ -68,6 +68,14 @@ int safekeeper_proto_version = 3;
char *safekeeper_conninfo_options = "";
/* BEGIN_HADRON */
int databricks_max_wal_mb_per_second = -1;
// during throttling, we will limit the effective WAL write rate to 10KB.
// PG can still push some WAL to SK, but at a very low rate.
int databricks_throttled_max_wal_bytes_per_second = 10 * 1024;
// The max sleep time of a batch. This is to make sure the rate limiter does not
// overshoot too much and block PG for a very long time.
// This is set as 5 minuetes for now. PG can send as much as 10MB of WALs to SK in one batch,
// so this effectively caps the write rate to ~30KB/s in the worst case.
static uint64 kRateLimitMaxBatchUSecs = 300 * USECS_PER_SEC;
/* END_HADRON */
/* Set to true in the walproposer bgw. */
@@ -86,6 +94,7 @@ static HotStandbyFeedback agg_hs_feedback;
static void nwp_register_gucs(void);
static void assign_neon_safekeepers(const char *newval, void *extra);
static uint64 backpressure_lag_impl(void);
static uint64 hadron_backpressure_lag_impl(void);
static uint64 startup_backpressure_wrap(void);
static bool backpressure_throttling_impl(void);
static void walprop_register_bgworker(void);
@@ -110,6 +119,22 @@ static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
static void CheckGracefulShutdown(WalProposer *wp);
/* BEGIN_HADRON */
shardno_t get_num_shards(void);
static int positive_mb_to_bytes(int mb)
{
if (mb <= 0)
{
return mb;
}
else
{
return mb * 1024 * 1024;
}
}
/* END_HADRON */
static void
init_walprop_config(bool syncSafekeepers)
{
@@ -257,6 +282,16 @@ nwp_register_gucs(void)
PGC_SUSET,
GUC_UNIT_MB,
NULL, NULL, NULL);
DefineCustomIntVariable(
"databricks.throttled_max_wal_bytes_per_second",
"The maximum WAL bytes per second when PG is being throttled.",
NULL,
&databricks_throttled_max_wal_bytes_per_second,
10 * 1024, 0, INT_MAX,
PGC_SUSET,
GUC_UNIT_BYTE,
NULL, NULL, NULL);
/* END_HADRON */
}
@@ -395,19 +430,65 @@ assign_neon_safekeepers(const char *newval, void *extra)
pfree(oldval);
}
/* Check if we need to suspend inserts because of lagging replication. */
static uint64
backpressure_lag_impl(void)
/* BEGIN_HADRON */
static uint64 hadron_backpressure_lag_impl(void)
{
struct WalproposerShmemState* state = NULL;
uint64 lag = 0;
/* BEGIN_HADRON */
if(max_cluster_size < 0){
// if max cluster size is not set, then we don't apply backpressure because we're reconfiguring PG
return 0;
}
/* END_HADRON */
lag = backpressure_lag_impl();
state = GetWalpropShmemState();
if ( state != NULL && databricks_max_wal_mb_per_second != -1 )
{
int old_limit = pg_atomic_read_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second);
int new_limit = (lag == 0)? positive_mb_to_bytes(databricks_max_wal_mb_per_second) : databricks_throttled_max_wal_bytes_per_second;
if( old_limit != new_limit )
{
uint64 batch_start_time = pg_atomic_read_u64(&state->wal_rate_limiter.batch_start_time_us);
uint64 batch_end_time = pg_atomic_read_u64(&state->wal_rate_limiter.batch_end_time_us);
// the rate limit has changed, we need to reset the rate limiter's batch end time
pg_atomic_write_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second, new_limit);
pg_atomic_write_u64(&state->wal_rate_limiter.batch_end_time_us, Min(batch_start_time + USECS_PER_SEC, batch_end_time));
}
if( new_limit == -1 )
{
return 0;
}
if (pg_atomic_read_u32(&state->wal_rate_limiter.should_limit) == true)
{
TimestampTz now = GetCurrentTimestamp();
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
uint64 batch_end_time = pg_atomic_read_u64(&limiter->batch_end_time_us);
if ( now >= batch_end_time )
{
/*
* The backend has past the batch end time and it's time to push more WALs.
* If the backends are pushing WALs too fast, the wal proposer will rate limit them again.
*/
uint32 expected = true;
pg_atomic_compare_exchange_u32(&state->wal_rate_limiter.should_limit, &expected, false);
return 0;
}
return Max(lag, 1);
}
// rate limiter decides to not throttle, then return 0.
return 0;
}
return lag;
}
/* END_HADRON */
/* Check if we need to suspend inserts because of lagging replication. */
static uint64
backpressure_lag_impl(void)
{
if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0)
{
XLogRecPtr writePtr;
@@ -441,30 +522,6 @@ backpressure_lag_impl(void)
return (myFlushLsn - applyPtr - max_replication_apply_lag * MB);
}
}
/* BEGIN_HADRON */
if (databricks_max_wal_mb_per_second == -1) {
return 0;
}
state = GetWalpropShmemState();
if (state != NULL && !!pg_atomic_read_u32(&state->wal_rate_limiter.should_limit))
{
TimestampTz now = GetCurrentTimestamp();
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us);
if (now - last_recorded_time > USECS_PER_SEC)
{
/*
* The backend has past 1 second since the last recorded time and it's time to push more WALs.
* If the backends are pushing WALs too fast, the wal proposer will rate limit them again.
*/
uint32 expected = true;
pg_atomic_compare_exchange_u32(&state->wal_rate_limiter.should_limit, &expected, false);
}
return 1;
}
/* END_HADRON */
return 0;
}
@@ -479,9 +536,9 @@ startup_backpressure_wrap(void)
if (AmStartupProcess() || !IsUnderPostmaster)
return 0;
delay_backend_us = &backpressure_lag_impl;
delay_backend_us = &hadron_backpressure_lag_impl;
return backpressure_lag_impl();
return hadron_backpressure_lag_impl();
}
/*
@@ -511,8 +568,10 @@ WalproposerShmemInit(void)
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
/* BEGIN_HADRON */
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.effective_max_wal_bytes_per_second, -1);
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_start_time_us, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_end_time_us, 0);
/* END_HADRON */
}
}
@@ -527,8 +586,10 @@ WalproposerShmemInit_SyncSafekeeper(void)
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
/* BEGIN_HADRON */
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.effective_max_wal_bytes_per_second, -1);
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_start_time_us, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_end_time_us, 0);
/* END_HADRON */
}
@@ -560,7 +621,7 @@ backpressure_throttling_impl(void)
return retry;
/* Calculate replicas lag */
lag = backpressure_lag_impl();
lag = hadron_backpressure_lag_impl();
if (lag == 0)
return retry;
@@ -646,18 +707,19 @@ walprop_pg_get_shmem_state(WalProposer *wp)
* Record new ps_feedback in the array with shards and update min_feedback.
*/
static PageserverFeedback
record_pageserver_feedback(PageserverFeedback *ps_feedback)
record_pageserver_feedback(PageserverFeedback *ps_feedback, shardno_t num_shards)
{
PageserverFeedback min_feedback;
Assert(ps_feedback->present);
Assert(ps_feedback->shard_number < MAX_SHARDS);
Assert(ps_feedback->shard_number < num_shards);
SpinLockAcquire(&walprop_shared->mutex);
/* Update the number of shards */
if (ps_feedback->shard_number + 1 > walprop_shared->num_shards)
walprop_shared->num_shards = ps_feedback->shard_number + 1;
// Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive
// a new pageserver feedback.
walprop_shared->num_shards = Max(walprop_shared->num_shards, num_shards);
/* Update the feedback */
memcpy(&walprop_shared->shard_ps_feedback[ps_feedback->shard_number], ps_feedback, sizeof(PageserverFeedback));
@@ -1475,6 +1537,7 @@ XLogBroadcastWalProposer(WalProposer *wp)
XLogRecPtr endptr;
struct WalproposerShmemState *state = NULL;
TimestampTz now = 0;
int effective_max_wal_bytes_per_second = 0;
/* Start from the last sent position */
startptr = sentPtr;
@@ -1529,22 +1592,36 @@ XLogBroadcastWalProposer(WalProposer *wp)
/* BEGIN_HADRON */
state = GetWalpropShmemState();
if (databricks_max_wal_mb_per_second != -1 && state != NULL)
effective_max_wal_bytes_per_second = pg_atomic_read_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second);
if (effective_max_wal_bytes_per_second != -1 && state != NULL)
{
uint64 max_wal_bytes = (uint64) databricks_max_wal_mb_per_second * 1024 * 1024;
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us);
if (now - last_recorded_time > USECS_PER_SEC)
uint64 batch_end_time = pg_atomic_read_u64(&limiter->batch_end_time_us);
if ( now >= batch_end_time )
{
/* Reset the rate limiter */
// Reset the rate limiter to start a new batch
limiter->sent_bytes = 0;
pg_atomic_write_u64(&limiter->last_recorded_time_us, now);
pg_atomic_write_u32(&limiter->should_limit, false);
pg_atomic_write_u64(&limiter->batch_start_time_us, now);
/* tentatively assign the batch end time as 1s from now. This could result in one of the following cases:
1. If sent_bytes does not reach effective_max_wal_bytes_per_second in 1s,
then we will reset the current batch and clear sent_bytes. No throttling happens.
2. Otherwise, we will recompute the end time (below) based on how many bytes are actually written,
and throttle PG until the batch end time. */
pg_atomic_write_u64(&limiter->batch_end_time_us, now + USECS_PER_SEC);
}
limiter->sent_bytes += (endptr - startptr);
if (limiter->sent_bytes > max_wal_bytes)
if (limiter->sent_bytes > effective_max_wal_bytes_per_second)
{
uint64_t batch_start_time = pg_atomic_read_u64(&limiter->batch_start_time_us);
uint64 throttle_usecs = USECS_PER_SEC * limiter->sent_bytes / Max(effective_max_wal_bytes_per_second, 1);
if (throttle_usecs > kRateLimitMaxBatchUSecs){
elog(LOG, "throttle_usecs %lu is too large, limiting to %lu", throttle_usecs, kRateLimitMaxBatchUSecs);
throttle_usecs = kRateLimitMaxBatchUSecs;
}
pg_atomic_write_u32(&limiter->should_limit, true);
pg_atomic_write_u64(&limiter->batch_end_time_us, batch_start_time + throttle_usecs);
}
}
/* END_HADRON */
@@ -2023,19 +2100,43 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
if (wp->config->syncSafekeepers)
return;
/* handle fresh ps_feedback */
if (sk->appendResponse.ps_feedback.present)
{
PageserverFeedback min_feedback = record_pageserver_feedback(&sk->appendResponse.ps_feedback);
shardno_t num_shards = get_num_shards();
/* Only one main shard sends non-zero currentClusterSize */
if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
// During shard split, we receive ps_feedback from child shards before
// the split commits and our shard map GUC has been updated. We must
// filter out such feedback here because record_pageserver_feedback()
// doesn't do it.
//
// NB: what we would actually want to happen is that we only receive
// ps_feedback from the parent shards when the split is committed, then
// apply the split to our set of tracked feedback and from here on only
// receive ps_feedback from child shards. This filter condition doesn't
// do that: if we split from N parent to 2N child shards, the first N
// child shards' feedback messages will pass this condition, even before
// the split is committed. That's a bit sloppy, but OK for now.
if (sk->appendResponse.ps_feedback.shard_number < num_shards)
{
standby_apply_lsn = min_feedback.disk_consistent_lsn;
needToAdvanceSlot = true;
PageserverFeedback min_feedback = record_pageserver_feedback(&sk->appendResponse.ps_feedback, num_shards);
/* Only one main shard sends non-zero currentClusterSize */
if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
{
standby_apply_lsn = min_feedback.disk_consistent_lsn;
needToAdvanceSlot = true;
}
}
else
{
// HADRON
elog(DEBUG2, "Ignoring pageserver feedback for unknown shard %d (current shard number %d)",
sk->appendResponse.ps_feedback.shard_number, num_shards);
}
}

View File

@@ -429,26 +429,13 @@ impl CancellationHandler {
/// (we'd need something like `#![feature(type_alias_impl_trait)]`).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CancelClosure {
socket_addr: SocketAddr,
cancel_token: RawCancelToken,
hostname: String, // for pg_sni router
user_info: ComputeUserInfo,
pub socket_addr: SocketAddr,
pub cancel_token: RawCancelToken,
pub hostname: String, // for pg_sni router
pub user_info: ComputeUserInfo,
}
impl CancelClosure {
pub(crate) fn new(
socket_addr: SocketAddr,
cancel_token: RawCancelToken,
hostname: String,
user_info: ComputeUserInfo,
) -> Self {
Self {
socket_addr,
cancel_token,
hostname,
user_info,
}
}
/// Cancels the query running on user's compute node.
pub(crate) async fn try_cancel_query(
&self,

View File

@@ -7,17 +7,15 @@ use std::net::{IpAddr, SocketAddr};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use postgres_client::config::{AuthKeys, ChannelBinding, SslMode};
use postgres_client::connect_raw::StartupStream;
use postgres_client::maybe_tls_stream::MaybeTlsStream;
use postgres_client::tls::MakeTlsConnect;
use postgres_client::{NoTls, RawCancelToken, RawConnection};
use postgres_protocol::message::backend::NoticeResponseBody;
use thiserror::Error;
use tokio::net::{TcpStream, lookup_host};
use tracing::{debug, error, info, warn};
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::auth::backend::ComputeCredentialKeys;
use crate::auth::parse_endpoint_param;
use crate::cancellation::CancelClosure;
use crate::compute::tls::TlsError;
use crate::config::ComputeConfig;
use crate::context::RequestContext;
@@ -236,8 +234,7 @@ impl AuthInfo {
&self,
ctx: &RequestContext,
compute: &mut ComputeConnection,
user_info: &ComputeUserInfo,
) -> Result<PostgresSettings, PostgresError> {
) -> Result<(), PostgresError> {
// client config with stubbed connect info.
// TODO(conrad): should we rewrite this to bypass tokio-postgres2 entirely,
// utilising pqproto.rs.
@@ -247,39 +244,10 @@ impl AuthInfo {
let tmp_config = self.enrich(tmp_config);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let connection = tmp_config
.tls_and_authenticate(&mut compute.stream, NoTls)
.await?;
tmp_config.authenticate(&mut compute.stream).await?;
drop(pause);
let RawConnection {
stream: _,
parameters,
delayed_notice,
process_id,
secret_key,
} = connection;
tracing::Span::current().record("pid", tracing::field::display(process_id));
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(
compute.socket_addr,
RawCancelToken {
ssl_mode: compute.ssl_mode,
process_id,
secret_key,
},
compute.hostname.to_string(),
user_info.clone(),
);
Ok(PostgresSettings {
params: parameters,
cancel_closure,
delayed_notice,
})
Ok(())
}
}
@@ -343,21 +311,9 @@ impl ConnectInfo {
pub type RustlsStream = <ComputeConfig as MakeTlsConnect<tokio::net::TcpStream>>::Stream;
pub type MaybeRustlsStream = MaybeTlsStream<tokio::net::TcpStream, RustlsStream>;
// TODO(conrad): we don't need to parse these.
// These are just immediately forwarded back to the client.
// We could instead stream them out instead of reading them into memory.
pub struct PostgresSettings {
/// PostgreSQL connection parameters.
pub params: std::collections::HashMap<String, String>,
/// Query cancellation token.
pub cancel_closure: CancelClosure,
/// Notices received from compute after authenticating
pub delayed_notice: Vec<NoticeResponseBody>,
}
pub struct ComputeConnection {
/// Socket connected to a compute node.
pub stream: MaybeTlsStream<tokio::net::TcpStream, RustlsStream>,
pub stream: StartupStream<tokio::net::TcpStream, RustlsStream>,
/// Labels for proxy's metrics.
pub aux: MetricsAuxInfo,
pub hostname: Host,
@@ -390,6 +346,7 @@ impl ConnectInfo {
ctx.get_testodrome_id().unwrap_or_default(),
);
let stream = StartupStream::new(stream);
let connection = ComputeConnection {
stream,
socket_addr,

View File

@@ -1,12 +1,13 @@
use std::sync::Arc;
use futures::{FutureExt, TryFutureExt};
use postgres_client::RawCancelToken;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info};
use crate::auth::backend::ConsoleRedirectBackend;
use crate::cancellation::CancellationHandler;
use crate::cancellation::{CancelClosure, CancellationHandler};
use crate::config::{ProxyConfig, ProxyProtocolV2};
use crate::context::RequestContext;
use crate::error::ReportableError;
@@ -16,7 +17,7 @@ use crate::pglb::handshake::{HandshakeData, handshake};
use crate::pglb::passthrough::ProxyPassthrough;
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
use crate::proxy::{ErrorSource, finish_client_init};
use crate::proxy::{ErrorSource, forward_compute_params_to_client, send_client_greeting};
use crate::util::run_until_cancelled;
pub async fn task_main(
@@ -226,21 +227,19 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
.or_else(|e| async { Err(stream.throw_error(e, Some(ctx)).await) })
.await?;
let pg_settings = auth_info
.authenticate(ctx, &mut node, &user_info)
auth_info
.authenticate(ctx, &mut node)
.or_else(|e| async { Err(stream.throw_error(e, Some(ctx)).await) })
.await?;
send_client_greeting(ctx, &config.greetings, &mut stream);
let session = cancellation_handler.get_key();
finish_client_init(
ctx,
&pg_settings,
*session.key(),
&mut stream,
&config.greetings,
);
let (process_id, secret_key) =
forward_compute_params_to_client(ctx, *session.key(), &mut stream, &mut node.stream)
.await?;
let stream = stream.flush_and_into_inner().await?;
let hostname = node.hostname.to_string();
let session_id = ctx.session_id();
let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel();
@@ -249,7 +248,16 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
.maintain_cancel_key(
session_id,
cancel,
&pg_settings.cancel_closure,
&CancelClosure {
socket_addr: node.socket_addr,
cancel_token: RawCancelToken {
ssl_mode: node.ssl_mode,
process_id,
secret_key,
},
hostname,
user_info,
},
&config.connect_to_compute,
)
.await;
@@ -257,7 +265,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
Ok(Some(ProxyPassthrough {
client: stream,
compute: node.stream,
compute: node.stream.into_framed().into_inner(),
aux: node.aux,
private_link_id: None,

View File

@@ -319,7 +319,7 @@ pub(crate) async fn handle_connection<S: AsyncRead + AsyncWrite + Unpin + Send>(
Ok(Some(ProxyPassthrough {
client,
compute: node.stream,
compute: node.stream.into_framed().into_inner(),
aux: node.aux,
private_link_id,

View File

@@ -313,6 +313,14 @@ impl WriteBuf {
self.0.set_position(0);
}
/// Shrinks the buffer if efficient to do so, and returns the remaining size.
pub fn occupied_len(&mut self) -> usize {
if self.should_shrink() {
self.shrink();
}
self.0.get_mut().len()
}
/// Write a raw message to the internal buffer.
///
/// The size_hint value is only a hint for reserving space. It's ok if it's incorrect, since

View File

@@ -9,18 +9,23 @@ use std::collections::HashSet;
use std::convert::Infallible;
use std::sync::Arc;
use futures::TryStreamExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use postgres_client::RawCancelToken;
use postgres_client::connect_raw::StartupStream;
use postgres_protocol::message::backend::Message;
use regex::Regex;
use serde::{Deserialize, Serialize};
use smol_str::{SmolStr, format_smolstr};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::sync::oneshot;
use tracing::Instrument;
use crate::cache::Cache;
use crate::cancellation::CancellationHandler;
use crate::compute::ComputeConnection;
use crate::cancellation::{CancelClosure, CancellationHandler};
use crate::compute::{ComputeConnection, PostgresError, RustlsStream};
use crate::config::ProxyConfig;
use crate::context::RequestContext;
use crate::control_plane::client::ControlPlaneClient;
@@ -105,7 +110,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
// the compute was cached, and we connected, but the compute cache was actually stale
// and is associated with the wrong endpoint. We detect this when the **authentication** fails.
// As such, we retry once here if the `authenticate` function fails and the error is valid to retry.
let pg_settings = loop {
loop {
attempt += 1;
// TODO: callback to pglb
@@ -127,9 +132,12 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
unreachable!("ensured above");
};
let res = auth_info.authenticate(ctx, &mut node, user_info).await;
let res = auth_info.authenticate(ctx, &mut node).await;
match res {
Ok(pg_settings) => break pg_settings,
Ok(()) => {
send_client_greeting(ctx, &config.greetings, client);
break;
}
Err(e) if attempt < 2 && e.should_retry_wake_compute() => {
tracing::warn!(error = ?e, "retrying wake compute");
@@ -141,11 +149,17 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
}
Err(e) => Err(client.throw_error(e, Some(ctx)).await)?,
}
}
let auth::Backend::ControlPlane(_, user_info) = backend else {
unreachable!("ensured above");
};
let session = cancellation_handler.get_key();
finish_client_init(ctx, &pg_settings, *session.key(), client, &config.greetings);
let (process_id, secret_key) =
forward_compute_params_to_client(ctx, *session.key(), client, &mut node.stream).await?;
let hostname = node.hostname.to_string();
let session_id = ctx.session_id();
let (cancel_on_shutdown, cancel) = oneshot::channel();
@@ -154,7 +168,16 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
.maintain_cancel_key(
session_id,
cancel,
&pg_settings.cancel_closure,
&CancelClosure {
socket_addr: node.socket_addr,
cancel_token: RawCancelToken {
ssl_mode: node.ssl_mode,
process_id,
secret_key,
},
hostname,
user_info,
},
&config.connect_to_compute,
)
.await;
@@ -163,35 +186,18 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
Ok((node, cancel_on_shutdown))
}
/// Finish client connection initialization: confirm auth success, send params, etc.
pub(crate) fn finish_client_init(
/// Greet the client with any useful information.
pub(crate) fn send_client_greeting(
ctx: &RequestContext,
settings: &compute::PostgresSettings,
cancel_key_data: CancelKeyData,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
greetings: &String,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) {
// Forward all deferred notices to the client.
for notice in &settings.delayed_notice {
client.write_raw(notice.as_bytes().len(), b'N', |buf| {
buf.extend_from_slice(notice.as_bytes());
});
}
// Expose session_id to clients if we have a greeting message.
if !greetings.is_empty() {
let session_msg = format!("{}, session_id: {}", greetings, ctx.session_id());
client.write_message(BeMessage::NoticeResponse(session_msg.as_str()));
}
// Forward all postgres connection params to the client.
for (name, value) in &settings.params {
client.write_message(BeMessage::ParameterStatus {
name: name.as_bytes(),
value: value.as_bytes(),
});
}
// Forward recorded latencies for probing requests
if let Some(testodrome_id) = ctx.get_testodrome_id() {
client.write_message(BeMessage::ParameterStatus {
@@ -221,9 +227,63 @@ pub(crate) fn finish_client_init(
value: latency_measured.retry.as_micros().to_string().as_bytes(),
});
}
}
client.write_message(BeMessage::BackendKeyData(cancel_key_data));
client.write_message(BeMessage::ReadyForQuery);
pub(crate) async fn forward_compute_params_to_client(
ctx: &RequestContext,
cancel_key_data: CancelKeyData,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
compute: &mut StartupStream<TcpStream, RustlsStream>,
) -> Result<(i32, i32), ClientRequestError> {
let mut process_id = 0;
let mut secret_key = 0;
let err = loop {
// if the client buffer is too large, let's write out some bytes now to save some space
client.write_if_full().await?;
let msg = match compute.try_next().await {
Ok(msg) => msg,
Err(e) => break postgres_client::Error::io(e),
};
match msg {
// Send our cancellation key data instead.
Some(Message::BackendKeyData(body)) => {
client.write_message(BeMessage::BackendKeyData(cancel_key_data));
process_id = body.process_id();
secret_key = body.secret_key();
}
// Forward all postgres connection params to the client.
Some(Message::ParameterStatus(body)) => {
if let Ok(name) = body.name()
&& let Ok(value) = body.value()
{
client.write_message(BeMessage::ParameterStatus {
name: name.as_bytes(),
value: value.as_bytes(),
});
}
}
// Forward all notices to the client.
Some(Message::NoticeResponse(notice)) => {
client.write_raw(notice.as_bytes().len(), b'N', |buf| {
buf.extend_from_slice(notice.as_bytes());
});
}
Some(Message::ReadyForQuery(_)) => {
client.write_message(BeMessage::ReadyForQuery);
return Ok((process_id, secret_key));
}
Some(Message::ErrorResponse(body)) => break postgres_client::Error::db(body),
Some(_) => break postgres_client::Error::unexpected_message(),
None => break postgres_client::Error::closed(),
}
};
Err(client
.throw_error(PostgresError::Postgres(err), Some(ctx))
.await)?
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]

View File

@@ -18,7 +18,7 @@ use tracing::{debug, info};
use super::AsyncRW;
use super::conn_pool::poll_client;
use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client};
use super::http_conn_pool::{self, HttpConnPool, LocalProxyClient, poll_http2_client};
use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool};
use crate::auth::backend::local::StaticAuthRules;
use crate::auth::backend::{ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo};
@@ -40,7 +40,8 @@ use crate::rate_limiter::EndpointRateLimiter;
use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
pub(crate) http_conn_pool:
Arc<GlobalConnPool<LocalProxyClient, HttpConnPool<LocalProxyClient>>>,
pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
pub(crate) pool:
Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
@@ -210,7 +211,7 @@ impl PoolingBackend {
&self,
ctx: &RequestContext,
conn_info: ConnInfo,
) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
) -> Result<http_conn_pool::Client<LocalProxyClient>, HttpConnError> {
debug!("pool: looking for an existing connection");
if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
return Ok(client);
@@ -568,7 +569,7 @@ impl ConnectMechanism for TokioMechanism {
}
struct HyperMechanism {
pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
pool: Arc<GlobalConnPool<LocalProxyClient, HttpConnPool<LocalProxyClient>>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,
@@ -578,7 +579,7 @@ struct HyperMechanism {
#[async_trait]
impl ConnectMechanism for HyperMechanism {
type Connection = http_conn_pool::Client<Send>;
type Connection = http_conn_pool::Client<LocalProxyClient>;
type ConnectError = HttpConnError;
type Error = HttpConnError;
@@ -632,7 +633,13 @@ async fn connect_http2(
port: u16,
timeout: Duration,
tls: Option<&Arc<rustls::ClientConfig>>,
) -> Result<(http_conn_pool::Send, http_conn_pool::Connect), LocalProxyConnError> {
) -> Result<
(
http_conn_pool::LocalProxyClient,
http_conn_pool::LocalProxyConnection,
),
LocalProxyConnError,
> {
let addrs = match host_addr {
Some(addr) => vec![SocketAddr::new(addr, port)],
None => lookup_host((host, port))

View File

@@ -190,6 +190,9 @@ mod tests {
fn get_process_id(&self) -> i32 {
0
}
fn reset(&mut self) -> Result<(), postgres_client::Error> {
Ok(())
}
}
fn create_inner() -> ClientInnerCommon<MockClient> {

View File

@@ -7,10 +7,9 @@ use std::time::Duration;
use clashmap::ClashMap;
use parking_lot::RwLock;
use postgres_client::ReadyForQueryStatus;
use rand::Rng;
use smol_str::ToSmolStr;
use tracing::{Span, debug, info};
use tracing::{Span, debug, info, warn};
use super::backend::HttpConnError;
use super::conn_pool::ClientDataRemote;
@@ -188,7 +187,7 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
self.pools.get_mut(&db_user)
}
pub(crate) fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInnerCommon<C>) {
pub(crate) fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, mut client: ClientInnerCommon<C>) {
let conn_id = client.get_conn_id();
let (max_conn, conn_count, pool_name) = {
let pool = pool.read();
@@ -201,12 +200,17 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
};
if client.inner.is_closed() {
info!(%conn_id, "{}: throwing away connection '{conn_info}' because connection is closed", pool_name);
info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because connection is closed");
return;
}
if let Err(error) = client.inner.reset() {
warn!(?error, %conn_id, "{pool_name}: throwing away connection '{conn_info}' because connection could not be reset");
return;
}
if conn_count >= max_conn {
info!(%conn_id, "{}: throwing away connection '{conn_info}' because pool is full", pool_name);
info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full");
return;
}
@@ -691,6 +695,7 @@ impl<C: ClientInnerExt> Deref for Client<C> {
pub(crate) trait ClientInnerExt: Sync + Send + 'static {
fn is_closed(&self) -> bool;
fn get_process_id(&self) -> i32;
fn reset(&mut self) -> Result<(), postgres_client::Error>;
}
impl ClientInnerExt for postgres_client::Client {
@@ -701,15 +706,13 @@ impl ClientInnerExt for postgres_client::Client {
fn get_process_id(&self) -> i32 {
self.get_process_id()
}
fn reset(&mut self) -> Result<(), postgres_client::Error> {
self.reset_session_background()
}
}
impl<C: ClientInnerExt> Discard<'_, C> {
pub(crate) fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!("pool: throwing away connection '{conn_info}' because connection is not idle");
}
}
pub(crate) fn discard(&mut self) {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {

View File

@@ -23,8 +23,8 @@ use crate::protocol2::ConnectionInfoExtra;
use crate::types::EndpointCacheKey;
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
pub(crate) type Send = http2::SendRequest<BoxBody<Bytes, hyper::Error>>;
pub(crate) type Connect =
pub(crate) type LocalProxyClient = http2::SendRequest<BoxBody<Bytes, hyper::Error>>;
pub(crate) type LocalProxyConnection =
http2::Connection<TokioIo<AsyncRW>, BoxBody<Bytes, hyper::Error>, TokioExecutor>;
#[derive(Clone)]
@@ -189,14 +189,14 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C, HttpConnPool<C>> {
}
pub(crate) fn poll_http2_client(
global_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
global_pool: Arc<GlobalConnPool<LocalProxyClient, HttpConnPool<LocalProxyClient>>>,
ctx: &RequestContext,
conn_info: &ConnInfo,
client: Send,
connection: Connect,
client: LocalProxyClient,
connection: LocalProxyConnection,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
) -> Client<Send> {
) -> Client<LocalProxyClient> {
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
let session_id = ctx.session_id();
@@ -285,7 +285,7 @@ impl<C: ClientInnerExt + Clone> Client<C> {
}
}
impl ClientInnerExt for Send {
impl ClientInnerExt for LocalProxyClient {
fn is_closed(&self) -> bool {
self.is_closed()
}
@@ -294,4 +294,10 @@ impl ClientInnerExt for Send {
// ideally throw something meaningful
-1
}
fn reset(&mut self) -> Result<(), postgres_client::Error> {
// We use HTTP/2.0 to talk to local proxy. HTTP is stateless,
// so there's nothing to reset.
Ok(())
}
}

View File

@@ -269,11 +269,6 @@ impl ClientInnerCommon<postgres_client::Client> {
local_data.jti += 1;
let token = resign_jwt(&local_data.key, payload, local_data.jti)?;
self.inner
.discard_all()
.await
.map_err(SqlOverHttpError::InternalPostgres)?;
// initiates the auth session
// this is safe from query injections as the jwt format free of any escape characters.
let query = format!("select auth.jwt_session_init('{token}')");

View File

@@ -46,7 +46,7 @@ use super::backend::{HttpConnError, LocalProxyConnError, PoolingBackend};
use super::conn_pool::AuthData;
use super::conn_pool_lib::ConnInfo;
use super::error::{ConnInfoError, Credentials, HttpCodeError, ReadPayloadError};
use super::http_conn_pool::{self, Send};
use super::http_conn_pool::{self, LocalProxyClient};
use super::http_util::{
ALLOW_POOL, CONN_STRING, NEON_REQUEST_ID, RAW_TEXT_OUTPUT, TXN_ISOLATION_LEVEL, TXN_READ_ONLY,
get_conn_info, json_response, uuid_to_header_value,
@@ -145,7 +145,7 @@ impl DbSchemaCache {
endpoint_id: &EndpointCacheKey,
auth_header: &HeaderValue,
connection_string: &str,
client: &mut http_conn_pool::Client<Send>,
client: &mut http_conn_pool::Client<LocalProxyClient>,
ctx: &RequestContext,
config: &'static ProxyConfig,
) -> Result<Arc<(ApiConfig, DbSchemaOwned)>, RestError> {
@@ -190,7 +190,7 @@ impl DbSchemaCache {
&self,
auth_header: &HeaderValue,
connection_string: &str,
client: &mut http_conn_pool::Client<Send>,
client: &mut http_conn_pool::Client<LocalProxyClient>,
ctx: &RequestContext,
config: &'static ProxyConfig,
) -> Result<(ApiConfig, DbSchemaOwned), RestError> {
@@ -430,7 +430,7 @@ struct BatchQueryData<'a> {
}
async fn make_local_proxy_request<S: DeserializeOwned>(
client: &mut http_conn_pool::Client<Send>,
client: &mut http_conn_pool::Client<LocalProxyClient>,
headers: impl IntoIterator<Item = (&HeaderName, HeaderValue)>,
body: QueryData<'_>,
max_len: usize,
@@ -461,7 +461,7 @@ async fn make_local_proxy_request<S: DeserializeOwned>(
}
async fn make_raw_local_proxy_request(
client: &mut http_conn_pool::Client<Send>,
client: &mut http_conn_pool::Client<LocalProxyClient>,
headers: impl IntoIterator<Item = (&HeaderName, HeaderValue)>,
body: String,
) -> Result<Response<Incoming>, RestError> {

View File

@@ -735,9 +735,7 @@ impl QueryData {
match batch_result {
// The query successfully completed.
Ok(status) => {
discard.check_idle(status);
Ok(_) => {
let json_output = String::from_utf8(json_buf).expect("json should be valid utf8");
Ok(json_output)
}
@@ -793,7 +791,7 @@ impl BatchQueryData {
{
Ok(json_output) => {
info!("commit");
let status = transaction
transaction
.commit()
.await
.inspect_err(|_| {
@@ -802,7 +800,6 @@ impl BatchQueryData {
discard.discard();
})
.map_err(SqlOverHttpError::Postgres)?;
discard.check_idle(status);
json_output
}
Err(SqlOverHttpError::Cancelled(_)) => {
@@ -815,17 +812,6 @@ impl BatchQueryData {
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
Err(err) => {
info!("rollback");
let status = transaction
.rollback()
.await
.inspect_err(|_| {
// if we cannot rollback - for now don't return connection to pool
// TODO: get a query status from the error
discard.discard();
})
.map_err(SqlOverHttpError::Postgres)?;
discard.check_idle(status);
return Err(err);
}
};
@@ -1012,12 +998,6 @@ impl Client {
}
impl Discard<'_> {
fn check_idle(&mut self, status: ReadyForQueryStatus) {
match self {
Discard::Remote(discard) => discard.check_idle(status),
Discard::Local(discard) => discard.check_idle(status),
}
}
fn discard(&mut self) {
match self {
Discard::Remote(discard) => discard.discard(),

View File

@@ -154,6 +154,15 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
message.write_message(&mut self.write);
}
/// Write the buffer to the socket until we have some more space again.
pub async fn write_if_full(&mut self) -> io::Result<()> {
while self.write.occupied_len() > 2048 {
self.stream.write_buf(&mut self.write).await?;
}
Ok(())
}
/// Flush the output buffer into the underlying stream.
///
/// This is cancel safe.

View File

@@ -161,9 +161,9 @@ pub async fn handle_request(
FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?;
// now we have a ready timeline in a temp directory
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path, None).await?;
global_timelines
.load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
.load_temp_timeline(request.destination_ttid, &tli_dir_path, None)
.await?;
Ok(())

View File

@@ -193,7 +193,7 @@ pub async fn hcc_pull_timeline(
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
http_hosts: Vec::new(),
ignore_tombstone: None,
mconf: None,
};
for host in timeline.peers {
if host.0 == conf.my_id.0 {

View File

@@ -352,7 +352,7 @@ async fn timeline_exclude_handler(mut request: Request<Body>) -> Result<Response
// instead.
if data.mconf.contains(my_id) {
return Err(ApiError::Forbidden(format!(
"refused to switch into {}, node {} is member of it",
"refused to exclude timeline with {}, node {} is member of it",
data.mconf, my_id
)));
}

View File

@@ -13,8 +13,8 @@ use http_utils::error::ApiError;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use remote_storage::GenericRemoteStorage;
use reqwest::Certificate;
use safekeeper_api::Term;
use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus};
use safekeeper_api::{Term, membership};
use safekeeper_client::mgmt_api;
use safekeeper_client::mgmt_api::Client;
use serde::Deserialize;
@@ -453,12 +453,40 @@ pub async fn handle_request(
global_timelines: Arc<GlobalTimelines>,
wait_for_peer_timeline_status: bool,
) -> Result<PullTimelineResponse, ApiError> {
if let Some(mconf) = &request.mconf {
let sk_id = global_timelines.get_sk_id();
if !mconf.contains(sk_id) {
return Err(ApiError::BadRequest(anyhow!(
"refused to pull timeline with {mconf}, node {sk_id} is not member of it",
)));
}
}
let existing_tli = global_timelines.get(TenantTimelineId::new(
request.tenant_id,
request.timeline_id,
));
if existing_tli.is_ok() {
info!("Timeline {} already exists", request.timeline_id);
if let Ok(timeline) = existing_tli {
let cur_generation = timeline
.read_shared_state()
.await
.sk
.state()
.mconf
.generation;
info!(
"Timeline {} already exists with generation {cur_generation}",
request.timeline_id,
);
if let Some(mconf) = request.mconf {
timeline
.membership_switch(mconf)
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
}
return Ok(PullTimelineResponse {
safekeeper_host: None,
});
@@ -495,6 +523,19 @@ pub async fn handle_request(
for (i, response) in responses.into_iter().enumerate() {
match response {
Ok(status) => {
if let Some(mconf) = &request.mconf {
if status.mconf.generation > mconf.generation {
// We probably raced with another timeline membership change with higher generation.
// Ignore this request.
return Err(ApiError::Conflict(format!(
"cannot pull timeline with generation {}: timeline {} already exists with generation {} on {}",
mconf.generation,
request.timeline_id,
status.mconf.generation,
http_hosts[i],
)));
}
}
statuses.push((status, i));
}
Err(e) => {
@@ -593,15 +634,13 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
let check_tombstone = !request.ignore_tombstone.unwrap_or_default();
match pull_timeline(
status,
safekeeper_host,
sk_auth_token,
http_client,
global_timelines,
check_tombstone,
request.mconf,
)
.await
{
@@ -611,6 +650,10 @@ pub async fn handle_request(
Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
safekeeper_host: None,
}),
Some(TimelineError::Deleted(_)) => Err(ApiError::Conflict(format!(
"Timeline {}/{} deleted",
request.tenant_id, request.timeline_id
))),
Some(TimelineError::CreationInProgress(_)) => {
// We don't return success here because creation might still fail.
Err(ApiError::Conflict("Creation in progress".to_owned()))
@@ -627,7 +670,7 @@ async fn pull_timeline(
sk_auth_token: Option<SecretString>,
http_client: reqwest::Client,
global_timelines: Arc<GlobalTimelines>,
check_tombstone: bool,
mconf: Option<membership::Configuration>,
) -> Result<PullTimelineResponse> {
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
info!(
@@ -689,8 +732,11 @@ async fn pull_timeline(
// fsync temp timeline directory to remember its contents.
fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
let generation = mconf.as_ref().map(|c| c.generation);
// Let's create timeline from temp directory and verify that it's correct
let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
let (commit_lsn, flush_lsn) =
validate_temp_timeline(conf, ttid, &tli_dir_path, generation).await?;
info!(
"finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
ttid, commit_lsn, flush_lsn
@@ -698,10 +744,20 @@ async fn pull_timeline(
assert!(status.commit_lsn <= status.flush_lsn);
// Finally, load the timeline.
let _tli = global_timelines
.load_temp_timeline(ttid, &tli_dir_path, check_tombstone)
let timeline = global_timelines
.load_temp_timeline(ttid, &tli_dir_path, generation)
.await?;
if let Some(mconf) = mconf {
// Switch to provided mconf to guarantee that the timeline will not
// be deleted by request with older generation.
// The generation might already be higer than the one in mconf, e.g.
// if another membership_switch request was executed between `load_temp_timeline`
// and `membership_switch`, but that's totaly fine. `membership_switch` will
// ignore switch to older generation.
timeline.membership_switch(mconf).await?;
}
Ok(PullTimelineResponse {
safekeeper_host: Some(host),
})

View File

@@ -1026,6 +1026,13 @@ where
self.state.finish_change(&state).await?;
}
if msg.mconf.generation > self.state.mconf.generation && !msg.mconf.contains(self.node_id) {
bail!(
"refused to switch into {}, node {} is not a member of it",
msg.mconf,
self.node_id,
);
}
// Switch into conf given by proposer conf if it is higher.
self.state.membership_switch(msg.mconf.clone()).await?;

View File

@@ -594,7 +594,7 @@ impl Timeline {
/// Cancel the timeline, requesting background activity to stop. Closing
/// the `self.gate` waits for that.
pub async fn cancel(&self) {
pub fn cancel(&self) {
info!("timeline {} shutting down", self.ttid);
self.cancel.cancel();
}
@@ -914,6 +914,13 @@ impl Timeline {
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
let mut state = self.write_shared_state().await;
// Ensure we don't race with exclude/delete requests by checking the cancellation
// token under the write_shared_state lock.
// Exclude/delete cancel the timeline under the shared state lock,
// so the timeline cannot be deleted in the middle of the membership switch.
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
state.sk.membership_switch(to).await
}

View File

@@ -10,13 +10,13 @@ use std::time::{Duration, Instant};
use anyhow::{Context, Result, bail};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use safekeeper_api::membership::Configuration;
use safekeeper_api::membership::{Configuration, SafekeeperGeneration};
use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
use safekeeper_api::{ServerInfo, membership};
use tokio::fs;
use tracing::*;
use utils::crashsafe::{durable_rename, fsync_async_opt};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
@@ -40,10 +40,17 @@ enum GlobalMapTimeline {
struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
// this map is dropped on restart.
tombstones: HashMap<TenantTimelineId, Instant>,
/// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
/// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
/// this map is dropped on restart.
/// The timeline might also be locally deleted (excluded) via safekeeper migration algorithm. In that case,
/// the tombsone contains the corresponding safekeeper generation. The pull_timeline requests with
/// higher generation ignore such tombstones and can recreate the timeline.
timeline_tombstones: HashMap<TenantTimelineId, TimelineTombstone>,
/// A tombstone indicates that the tenant used to exist has been deleted.
/// These are created only by tenant_delete requests. They are always valid regardless of the
/// request generation.
/// This is only soft-enforced, as this map is dropped on restart.
tenant_tombstones: HashMap<TenantId, Instant>,
conf: Arc<SafeKeeperConf>,
@@ -79,7 +86,7 @@ impl GlobalTimelinesState {
Err(TimelineError::CreationInProgress(*ttid))
}
None => {
if self.has_tombstone(ttid) {
if self.has_tombstone(ttid, None) {
Err(TimelineError::Deleted(*ttid))
} else {
Err(TimelineError::NotFound(*ttid))
@@ -88,20 +95,46 @@ impl GlobalTimelinesState {
}
}
fn has_tombstone(&self, ttid: &TenantTimelineId) -> bool {
self.tombstones.contains_key(ttid) || self.tenant_tombstones.contains_key(&ttid.tenant_id)
fn has_timeline_tombstone(
&self,
ttid: &TenantTimelineId,
generation: Option<SafekeeperGeneration>,
) -> bool {
if let Some(generation) = generation {
self.timeline_tombstones
.get(ttid)
.is_some_and(|t| t.is_valid(generation))
} else {
self.timeline_tombstones.contains_key(ttid)
}
}
/// Removes all blocking tombstones for the given timeline ID.
fn has_tenant_tombstone(&self, tenant_id: &TenantId) -> bool {
self.tenant_tombstones.contains_key(tenant_id)
}
/// Check if the state has a tenant or a timeline tombstone.
/// If `generation` is provided, check only for timeline tombsotnes with same or higher generation.
/// If `generation` is `None`, check for any timeline tombstone.
/// Tenant tombstones are checked regardless of the generation.
fn has_tombstone(
&self,
ttid: &TenantTimelineId,
generation: Option<SafekeeperGeneration>,
) -> bool {
self.has_timeline_tombstone(ttid, generation) || self.has_tenant_tombstone(&ttid.tenant_id)
}
/// Removes timeline tombstone for the given timeline ID.
/// Returns `true` if there have been actual changes.
fn remove_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
self.tombstones.remove(ttid).is_some()
|| self.tenant_tombstones.remove(&ttid.tenant_id).is_some()
fn remove_timeline_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
self.timeline_tombstones.remove(ttid).is_some()
}
fn delete(&mut self, ttid: TenantTimelineId) {
fn delete(&mut self, ttid: TenantTimelineId, generation: Option<SafekeeperGeneration>) {
self.timelines.remove(&ttid);
self.tombstones.insert(ttid, Instant::now());
self.timeline_tombstones
.insert(ttid, TimelineTombstone::new(generation));
}
fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
@@ -120,7 +153,7 @@ impl GlobalTimelines {
Self {
state: Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
timeline_tombstones: HashMap::new(),
tenant_tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
@@ -261,6 +294,8 @@ impl GlobalTimelines {
start_lsn: Lsn,
commit_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let generation = Some(mconf.generation);
let (conf, _, _, _) = {
let state = self.state.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
@@ -268,8 +303,8 @@ impl GlobalTimelines {
return Ok(timeline);
}
if state.has_tombstone(&ttid) {
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
if state.has_tombstone(&ttid, generation) {
anyhow::bail!(TimelineError::Deleted(ttid));
}
state.get_dependencies()
@@ -284,7 +319,9 @@ impl GlobalTimelines {
// immediately initialize first WAL segment as well.
let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?;
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
let timeline = self
.load_temp_timeline(ttid, &tmp_dir_path, generation)
.await?;
Ok(timeline)
}
@@ -303,7 +340,7 @@ impl GlobalTimelines {
&self,
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
check_tombstone: bool,
generation: Option<SafekeeperGeneration>,
) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
@@ -317,18 +354,18 @@ impl GlobalTimelines {
}
_ => {}
}
if check_tombstone {
if state.has_tombstone(&ttid) {
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
}
} else {
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
// It's also possible that we enter this when the tenant has been deleted, even if the timeline itself has never existed.
if state.remove_tombstone(&ttid) {
warn!("un-deleted timeline {ttid}");
}
if state.has_tombstone(&ttid, generation) {
// If the timeline is deleted, we refuse to recreate it.
// This is a safeguard against accidentally overwriting a timeline that was deleted
// by concurrent request.
anyhow::bail!(TimelineError::Deleted(ttid));
}
// We might have an outdated tombstone with the older generation.
// Remove it unconditionally.
state.remove_timeline_tombstone(&ttid);
state
.timelines
.insert(ttid, GlobalMapTimeline::CreationInProgress);
@@ -503,11 +540,16 @@ impl GlobalTimelines {
ttid: &TenantTimelineId,
action: DeleteOrExclude,
) -> Result<TimelineDeleteResult, DeleteOrExcludeError> {
let generation = match &action {
DeleteOrExclude::Delete | DeleteOrExclude::DeleteLocal => None,
DeleteOrExclude::Exclude(mconf) => Some(mconf.generation),
};
let tli_res = {
let state = self.state.lock().unwrap();
// Do NOT check tenant tombstones here: those were set earlier
if state.tombstones.contains_key(ttid) {
if state.has_timeline_tombstone(ttid, generation) {
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
info!("Timeline {ttid} was already deleted");
return Ok(TimelineDeleteResult { dir_existed: false });
@@ -528,6 +570,11 @@ impl GlobalTimelines {
// We would like to avoid holding the lock while waiting for the
// gate to finish as this is deadlock prone, so for actual
// deletion will take it second time.
//
// Canceling the timeline will block membership switch requests,
// ensuring that the timeline generation will not increase
// after this point, and we will not remove a timeline with a generation
// higher than the requested one.
if let DeleteOrExclude::Exclude(ref mconf) = action {
let shared_state = timeline.read_shared_state().await;
if shared_state.sk.state().mconf.generation > mconf.generation {
@@ -536,9 +583,9 @@ impl GlobalTimelines {
current: shared_state.sk.state().mconf.clone(),
});
}
timeline.cancel().await;
timeline.cancel();
} else {
timeline.cancel().await;
timeline.cancel();
}
timeline.close().await;
@@ -565,7 +612,7 @@ impl GlobalTimelines {
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
// are used to prevent still-running computes from re-creating the same timeline when they send data,
// and to speed up repeated deletion calls by avoiding re-listing objects.
self.state.lock().unwrap().delete(*ttid);
self.state.lock().unwrap().delete(*ttid, generation);
result
}
@@ -627,12 +674,16 @@ impl GlobalTimelines {
// may recreate a deleted timeline.
let now = Instant::now();
state
.tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
.timeline_tombstones
.retain(|_, v| now.duration_since(v.timestamp) < *tombstone_ttl);
state
.tenant_tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
}
pub fn get_sk_id(&self) -> NodeId {
self.state.lock().unwrap().conf.my_id
}
}
/// Action for delete_or_exclude.
@@ -673,6 +724,7 @@ pub async fn validate_temp_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
path: &Utf8PathBuf,
generation: Option<SafekeeperGeneration>,
) -> Result<(Lsn, Lsn)> {
let control_path = path.join("safekeeper.control");
@@ -681,6 +733,15 @@ pub async fn validate_temp_timeline(
bail!("wal_seg_size is not set");
}
if let Some(generation) = generation {
if control_store.mconf.generation > generation {
bail!(
"tmp timeline generation {} is higher than expected {generation}",
control_store.mconf.generation
);
}
}
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
let commit_lsn = control_store.commit_lsn;
@@ -688,3 +749,28 @@ pub async fn validate_temp_timeline(
Ok((commit_lsn, flush_lsn))
}
/// A tombstone for a deleted timeline.
/// The generation is passed with "exclude" request and stored in the tombstone.
/// We ignore the tombstone if the request generation is higher than
/// the tombstone generation.
/// If the tombstone doesn't have a generation, it's considered permanent,
/// e.g. after "delete" request.
struct TimelineTombstone {
timestamp: Instant,
generation: Option<SafekeeperGeneration>,
}
impl TimelineTombstone {
fn new(generation: Option<SafekeeperGeneration>) -> Self {
TimelineTombstone {
timestamp: Instant::now(),
generation,
}
}
/// Check if the timeline is still valid for the given generation.
fn is_valid(&self, generation: SafekeeperGeneration) -> bool {
self.generation.is_none_or(|g| g >= generation)
}
}

View File

@@ -364,7 +364,12 @@ impl SafekeeperReconcilerInner {
http_hosts,
tenant_id: req.tenant_id,
timeline_id,
ignore_tombstone: Some(false),
// TODO(diko): get mconf from "timelines" table and pass it here.
// Now we use pull_timeline reconciliation only for the timeline creation,
// so it's not critical right now.
// It could be fixed together with other reconciliation issues:
// https://github.com/neondatabase/neon/issues/12189
mconf: None,
};
success = self
.reconcile_inner(

View File

@@ -991,6 +991,7 @@ impl Service {
timeline_id: TimelineId,
to_safekeepers: &[Safekeeper],
from_safekeepers: &[Safekeeper],
mconf: membership::Configuration,
) -> Result<(), ApiError> {
let http_hosts = from_safekeepers
.iter()
@@ -1009,14 +1010,11 @@ impl Service {
.collect::<Vec<_>>()
);
// TODO(diko): need to pass mconf/generation with the request
// to properly handle tombstones. Ignore tombstones for now.
// Worst case: we leave a timeline on a safekeeper which is not in the current set.
let req = PullTimelineRequest {
tenant_id,
timeline_id,
http_hosts,
ignore_tombstone: Some(true),
mconf: Some(mconf),
};
const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
@@ -1336,6 +1334,7 @@ impl Service {
timeline_id,
&pull_to_safekeepers,
&cur_safekeepers,
joint_config.clone(),
)
.await?;

View File

@@ -40,6 +40,7 @@ def test_cloud_regress(
"PGUSER": remote_pg.default_options["user"],
"PGPASSWORD": remote_pg.default_options["password"],
"PGDATABASE": remote_pg.default_options["dbname"],
"NEON_MOTD": "", # Disable MOTD for tests
}
regress_cmd = [
str(regress_bin),

View File

@@ -87,9 +87,10 @@ class EndpointHttpClient(requests.Session):
def prewarmed():
json = self.prewarm_lfc_status()
status, err = json["status"], json.get("error")
assert status == "completed", f"{status}, {err=}"
assert status in ["failed", "completed", "skipped"], f"{status}, {err=}"
wait_until(prewarmed, timeout=60)
assert self.prewarm_lfc_status()["status"] != "failed"
def offload_lfc_status(self) -> dict[str, str]:
res = self.get(self.offload_url)
@@ -105,19 +106,19 @@ class EndpointHttpClient(requests.Session):
def offloaded():
json = self.offload_lfc_status()
status, err = json["status"], json.get("error")
assert status == "completed", f"{status}, {err=}"
assert status in ["failed", "completed"], f"{status}, {err=}"
wait_until(offloaded)
assert self.offload_lfc_status()["status"] != "failed"
def promote(self, safekeepers_lsn: dict[str, Any], disconnect: bool = False):
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False):
url = f"http://localhost:{self.external_port}/promote"
if disconnect:
try: # send first request to start promote and disconnect
self.post(url, data=safekeepers_lsn, timeout=0.001)
self.post(url, json=promote_spec, timeout=0.001)
except ReadTimeout:
pass # wait on second request which returns on promotion finish
res = self.post(url, data=safekeepers_lsn)
res.raise_for_status()
res = self.post(url, json=promote_spec)
json: dict[str, str] = res.json()
return json

View File

@@ -212,11 +212,13 @@ class NeonLocalCli(AbstractNeonCli):
pg_version,
]
if conf is not None:
args.extend(
chain.from_iterable(
product(["-c"], (f"{key}:{value}" for key, value in conf.items()))
)
)
for key, value in conf.items():
if isinstance(value, bool):
args.extend(
["-c", f"{key}:{str(value).lower()}"]
) # only accepts true/false not True/False
else:
args.extend(["-c", f"{key}:{value}"])
if set_default:
args.append("--set-default")

View File

@@ -1540,6 +1540,17 @@ class NeonEnv:
raise RuntimeError(f"Pageserver with ID {id} not found")
def get_safekeeper(self, id: int) -> Safekeeper:
"""
Look up a safekeeper by its ID.
"""
for sk in self.safekeepers:
if sk.id == id:
return sk
raise RuntimeError(f"Safekeeper with ID {id} not found")
def get_tenant_pageserver(self, tenant_id: TenantId | TenantShardId):
"""
Get the NeonPageserver where this tenant shard is currently attached, according
@@ -3899,6 +3910,41 @@ class NeonProxy(PgProtocol):
assert response.status_code == expected_code, f"response: {response.json()}"
return response.json()
def http_multiquery(self, *queries, **kwargs):
# TODO maybe use default values if not provided
user = quote(kwargs["user"])
password = quote(kwargs["password"])
expected_code = kwargs.get("expected_code")
timeout = kwargs.get("timeout")
json_queries = []
for query in queries:
if type(query) is str:
json_queries.append({"query": query})
else:
[query, params] = query
json_queries.append({"query": query, "params": params})
queries_str = [j["query"] for j in json_queries]
log.info(f"Executing http queries: {queries_str}")
connstr = f"postgresql://{user}:{password}@{self.domain}:{self.proxy_port}/postgres"
response = requests.post(
f"https://{self.domain}:{self.external_http_port}/sql",
data=json.dumps({"queries": json_queries}),
headers={
"Content-Type": "application/sql",
"Neon-Connection-String": connstr,
"Neon-Pool-Opt-In": "true",
},
verify=str(self.test_output_dir / "proxy.crt"),
timeout=timeout,
)
if expected_code is not None:
assert response.status_code == expected_code, f"response: {response.json()}"
return response.json()
async def http2_query(self, query, args, **kwargs):
# TODO maybe use default values if not provided
user = kwargs["user"]
@@ -4748,9 +4794,10 @@ class Endpoint(PgProtocol, LogUtils):
m = re.search(r"=\s*(\S+)", line)
assert m is not None, f"malformed config line {line}"
size = m.group(1)
assert size_to_bytes(size) >= size_to_bytes("1MB"), (
"LFC size cannot be set less than 1MB"
)
if size_to_bytes(size) > 0:
assert size_to_bytes(size) >= size_to_bytes("1MB"), (
"LFC size cannot be set less than 1MB"
)
lfc_path_escaped = str(lfc_path).replace("'", "''")
config_lines = [
f"neon.file_cache_path = '{lfc_path_escaped}'",
@@ -4905,6 +4952,10 @@ class Endpoint(PgProtocol, LogUtils):
log.debug(json.dumps(dict(data_dict, **kwargs)))
json.dump(dict(data_dict, **kwargs), file, indent=4)
def get_compute_spec(self) -> dict[str, Any]:
out = json.loads((Path(self.endpoint_path()) / "config.json").read_text())["spec"]
return cast("dict[str, Any]", out)
def respec_deep(self, **kwargs: Any) -> None:
"""
Update the endpoint.json file taking into account nested keys.
@@ -5391,15 +5442,24 @@ class Safekeeper(LogUtils):
return timeline_status.commit_lsn
def pull_timeline(
self, srcs: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId
self,
srcs: list[Safekeeper],
tenant_id: TenantId,
timeline_id: TimelineId,
mconf: MembershipConfiguration | None = None,
) -> dict[str, Any]:
"""
pull_timeline from srcs to self.
"""
src_https = [f"http://localhost:{sk.port.http}" for sk in srcs]
res = self.http_client().pull_timeline(
{"tenant_id": str(tenant_id), "timeline_id": str(timeline_id), "http_hosts": src_https}
)
body: dict[str, Any] = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"http_hosts": src_https,
}
if mconf is not None:
body["mconf"] = mconf.__dict__
res = self.http_client().pull_timeline(body)
src_ids = [sk.id for sk in srcs]
log.info(f"finished pulling timeline from {src_ids} to {self.id}")
return res

View File

@@ -5,7 +5,7 @@ import pytest
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.compare_fixtures import RemoteCompare
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_flush_lsn
from fixtures.utils import shared_buffers_for_max_cu
@@ -69,13 +69,18 @@ def test_perf_many_relations(remote_compare: RemoteCompare, num_relations: int):
)
def test_perf_simple_many_relations_reldir_v2(
neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
@pytest.mark.parametrize(
"reldir,num_relations",
[("v1", 10000), ("v1v2", 10000), ("v2", 10000), ("v2", 100000)],
ids=["v1-small", "v1v2-small", "v2-small", "v2-large"],
)
def test_perf_simple_many_relations_reldir(
neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, reldir: str, num_relations: int
):
"""
Test creating many relations in a single database.
"""
env = neon_env_builder.init_start(initial_tenant_conf={"rel_size_v2_enabled": "true"})
env = neon_env_builder.init_start(initial_tenant_conf={"rel_size_v2_enabled": reldir != "v1"})
ep = env.endpoints.create_start(
"main",
config_lines=[
@@ -85,14 +90,38 @@ def test_perf_simple_many_relations_reldir_v2(
],
)
assert (
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
"rel_size_migration"
]
!= "legacy"
)
ep.safe_psql("CREATE TABLE IF NOT EXISTS initial_table (v1 int)")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
n = 100000
if reldir == "v1":
assert (
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
"rel_size_migration"
]
== "legacy"
)
elif reldir == "v1v2":
assert (
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
"rel_size_migration"
]
== "migrating"
)
elif reldir == "v2":
# only read/write to the v2 keyspace
env.pageserver.http_client().timeline_patch_index_part(
env.initial_tenant, env.initial_timeline, {"rel_size_migration": "migrated"}
)
assert (
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
"rel_size_migration"
]
== "migrated"
)
else:
raise AssertionError(f"Invalid reldir config: {reldir}")
n = num_relations
step = 5000
# Create many relations
log.info(f"Creating {n} relations...")

View File

@@ -58,7 +58,7 @@ PREEMPT_GC_COMPACTION_TENANT_CONF = {
"compaction_upper_limit": 6,
"lsn_lease_length": "0s",
# Enable gc-compaction
"gc_compaction_enabled": "true",
"gc_compaction_enabled": True,
"gc_compaction_initial_threshold_kb": 1024, # At a small threshold
"gc_compaction_ratio_percent": 1,
# No PiTR interval and small GC horizon
@@ -540,7 +540,7 @@ def test_pageserver_gc_compaction_trigger(neon_env_builder: NeonEnvBuilder):
"pitr_interval": "0s",
"gc_horizon": f"{1024 * 16}",
"lsn_lease_length": "0s",
"gc_compaction_enabled": "true",
"gc_compaction_enabled": True,
"gc_compaction_initial_threshold_kb": "16",
"gc_compaction_ratio_percent": "50",
# Do not generate image layers with create_image_layers

View File

@@ -538,6 +538,7 @@ def test_historic_storage_formats(
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.pg_version = dataset.pg_version
env = neon_env_builder.init_configs()
env.start()
assert isinstance(env.pageserver_remote_storage, S3Storage)
@@ -576,6 +577,17 @@ def test_historic_storage_formats(
# All our artifacts should contain at least one timeline
assert len(timelines) > 0
if dataset.name == "2025-04-08-tenant-manifest-v1":
# This dataset was created at a time where we decided to migrate to v2 reldir by simply disabling writes to v1
# and starting writing to v2. This was too risky and we have reworked the migration plan. Therefore, we should
# opt in full relv2 mode for this dataset.
for timeline in timelines:
env.pageserver.http_client().timeline_patch_index_part(
dataset.tenant_id,
timeline["timeline_id"],
{"force_index_update": True, "rel_size_migration": "migrated"},
)
# Import tenant does not create the timeline on safekeepers,
# because it is a debug handler and the timeline may have already been
# created on some set of safekeepers.

View File

@@ -164,6 +164,25 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
check_prewarmed(method, client, desired)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm_empty(neon_simple_env: NeonEnv):
"""
Test there are no errors when trying to offload or prewarm endpoint without cache using compute_ctl.
Endpoint without cache is simulated by turning off LFC manually, but in cloud/ setup this is
also reproduced on fresh endpoints
"""
env = neon_simple_env
ep = env.endpoints.create_start("main", config_lines=["neon.file_cache_size_limit=0"])
client = ep.http_client()
conn = ep.connect()
cur = conn.cursor()
cur.execute("create schema neon; create extension neon with schema neon")
method = PrewarmMethod.COMPUTE_CTL
offload_lfc(method, client, cur)
prewarm_endpoint(method, client, cur, None)
assert client.prewarm_lfc_status()["status"] == "skipped"
# autoprewarm isn't needed as we prewarm manually
WORKLOAD_VALUES = METHOD_VALUES[:-1]
WORKLOAD_IDS = METHOD_IDS[:-1]

View File

@@ -16,7 +16,7 @@ def test_ondemand_download_pg_xact(neon_env_builder: NeonEnvBuilder, shard_count
neon_env_builder.num_pageservers = shard_count
tenant_conf = {
"lazy_slru_download": "true",
"lazy_slru_download": True,
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
}
@@ -82,7 +82,7 @@ def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count
neon_env_builder.num_pageservers = shard_count
tenant_conf = {
"lazy_slru_download": "true",
"lazy_slru_download": True,
}
env = neon_env_builder.init_start(
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
@@ -141,7 +141,7 @@ def test_ondemand_download_after_wal_switch(neon_env_builder: NeonEnvBuilder):
"""
tenant_conf = {
"lazy_slru_download": "true",
"lazy_slru_download": True,
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)

View File

@@ -395,23 +395,6 @@ def test_max_wal_rate(neon_simple_env: NeonEnv):
tuples = endpoint.safe_psql("SELECT backpressure_throttling_time();")
assert tuples[0][0] == 0, "Backpressure throttling detected"
# 0 MB/s max_wal_rate. WAL proposer can still push some WALs but will be super slow.
endpoint.safe_psql_many(
[
"ALTER SYSTEM SET databricks.max_wal_mb_per_second = 0;",
"SELECT pg_reload_conf();",
]
)
# Write ~10 KB data should hit backpressure.
with endpoint.cursor(dbname=DBNAME) as cur:
cur.execute("SET databricks.max_wal_mb_per_second = 0;")
for _ in range(0, 10):
cur.execute("INSERT INTO usertable SELECT random(), repeat('a', 1000);")
tuples = endpoint.safe_psql("SELECT backpressure_throttling_time();")
assert tuples[0][0] > 0, "No backpressure throttling detected"
# 1 MB/s max_wal_rate.
endpoint.safe_psql_many(
[
@@ -457,21 +440,6 @@ def test_tx_abort_with_many_relations(
],
)
if reldir_type == "v1":
assert (
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
"rel_size_migration"
]
== "legacy"
)
else:
assert (
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
"rel_size_migration"
]
!= "legacy"
)
# How many relations: this number is tuned to be long enough to take tens of seconds
# if the rollback code path is buggy, tripping the test's timeout.
n = 5000
@@ -556,3 +524,19 @@ def test_tx_abort_with_many_relations(
except:
exec.shutdown(wait=False, cancel_futures=True)
raise
# Do the check after everything is done, because the reldirv2 transition won't happen until create table.
if reldir_type == "v1":
assert (
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
"rel_size_migration"
]
== "legacy"
)
else:
assert (
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
"rel_size_migration"
]
!= "legacy"
)

View File

@@ -17,9 +17,6 @@ if TYPE_CHECKING:
from typing import Any
GET_CONNECTION_PID_QUERY = "SELECT pid FROM pg_stat_activity WHERE state = 'active'"
@pytest.mark.asyncio
async def test_http_pool_begin_1(static_proxy: NeonProxy):
static_proxy.safe_psql("create user http_auth with password 'http' superuser")
@@ -479,7 +476,7 @@ def test_sql_over_http_pool(static_proxy: NeonProxy):
def get_pid(status: int, pw: str, user="http_auth") -> Any:
return static_proxy.http_query(
GET_CONNECTION_PID_QUERY,
"SELECT pg_backend_pid() as pid",
[],
user=user,
password=pw,
@@ -513,6 +510,35 @@ def test_sql_over_http_pool(static_proxy: NeonProxy):
assert "password authentication failed for user" in res["message"]
def test_sql_over_http_pool_settings(static_proxy: NeonProxy):
static_proxy.safe_psql("create user http_auth with password 'http' superuser")
def multiquery(*queries) -> Any:
results = static_proxy.http_multiquery(
*queries,
user="http_auth",
password="http",
expected_code=200,
)
return [result["rows"] for result in results["results"]]
[[intervalstyle]] = static_proxy.safe_psql("SHOW IntervalStyle")
assert intervalstyle == "postgres", "'postgres' is the default IntervalStyle in postgres"
result = multiquery("select '0 seconds'::interval as interval")
assert result[0][0]["interval"] == "00:00:00", "interval is expected in postgres format"
result = multiquery(
"SET IntervalStyle = 'iso_8601'",
"select '0 seconds'::interval as interval",
)
assert result[1][0]["interval"] == "PT0S", "interval is expected in ISO-8601 format"
result = multiquery("select '0 seconds'::interval as interval")
assert result[0][0]["interval"] == "00:00:00", "interval is expected in postgres format"
def test_sql_over_http_urlencoding(static_proxy: NeonProxy):
static_proxy.safe_psql("create user \"http+auth$$\" with password '%+$^&*@!' superuser")
@@ -544,23 +570,37 @@ def test_http_pool_begin(static_proxy: NeonProxy):
query(200, "SELECT 1;") # Query that should succeed regardless of the transaction
def test_sql_over_http_pool_idle(static_proxy: NeonProxy):
def test_sql_over_http_pool_tx_reuse(static_proxy: NeonProxy):
static_proxy.safe_psql("create user http_auth2 with password 'http' superuser")
def query(status: int, query: str) -> Any:
def query(status: int, query: str, *args) -> Any:
return static_proxy.http_query(
query,
[],
args,
user="http_auth2",
password="http",
expected_code=status,
)
pid1 = query(200, GET_CONNECTION_PID_QUERY)["rows"][0]["pid"]
def query_pid_txid() -> Any:
result = query(
200,
"SELECT pg_backend_pid() as pid, pg_current_xact_id() as txid",
)
return result["rows"][0]
res0 = query_pid_txid()
time.sleep(0.02)
query(200, "BEGIN")
pid2 = query(200, GET_CONNECTION_PID_QUERY)["rows"][0]["pid"]
assert pid1 != pid2
res1 = query_pid_txid()
res2 = query_pid_txid()
assert res0["pid"] == res1["pid"], "connection should be reused"
assert res0["pid"] == res2["pid"], "connection should be reused"
assert res1["txid"] != res2["txid"], "txid should be different"
@pytest.mark.timeout(60)

View File

@@ -7,6 +7,8 @@ if TYPE_CHECKING:
NeonEnvBuilder,
)
from fixtures.neon_fixtures import wait_for_last_flush_lsn
def test_pageserver_reldir_v2(
neon_env_builder: NeonEnvBuilder,
@@ -65,6 +67,8 @@ def test_pageserver_reldir_v2(
endpoint.safe_psql("CREATE TABLE foo4 (id INTEGER PRIMARY KEY, val text)")
# Delete a relation in v1
endpoint.safe_psql("DROP TABLE foo1")
# wait pageserver to apply the LSN
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
# Check if both relations are still accessible
endpoint.safe_psql("SELECT * FROM foo2")
@@ -76,12 +80,16 @@ def test_pageserver_reldir_v2(
# This will acquire a basebackup, which lists all relations.
endpoint.start()
# Check if both relations are still accessible
# Check if both relations are still accessible after restart
endpoint.safe_psql("DROP TABLE IF EXISTS foo1")
endpoint.safe_psql("SELECT * FROM foo2")
endpoint.safe_psql("SELECT * FROM foo3")
endpoint.safe_psql("SELECT * FROM foo4")
endpoint.safe_psql("DROP TABLE foo3")
# wait pageserver to apply the LSN
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
# Restart the endpoint again
endpoint.stop()
endpoint.start()
@@ -99,6 +107,9 @@ def test_pageserver_reldir_v2(
},
)
endpoint.stop()
endpoint.start()
# Check if the relation is still accessible
endpoint.safe_psql("SELECT * FROM foo2")
endpoint.safe_psql("SELECT * FROM foo4")
@@ -111,3 +122,10 @@ def test_pageserver_reldir_v2(
]
== "migrating"
)
assert (
env.pageserver.http_client().timeline_detail(env.initial_tenant, env.initial_timeline)[
"rel_size_migrated_at"
]
is not None
)

View File

@@ -90,6 +90,7 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
primary_spec = primary.get_compute_spec()
primary_endpoint_id = primary.endpoint_id
stop_and_check_lsn(primary, expected_primary_lsn)
@@ -99,10 +100,9 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
if method == PromoteMethod.COMPUTE_CTL:
client = secondary.http_client()
client.prewarm_lfc(primary_endpoint_id)
# control plane knows safekeepers, simulate it by querying primary
assert (lsn := primary.terminate_flush_lsn)
safekeepers_lsn = {"safekeepers": safekeepers, "wal_flush_lsn": lsn}
assert client.promote(safekeepers_lsn)["status"] == "completed"
promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)}
assert client.promote(promote_spec)["status"] == "completed"
else:
promo_cur.execute(f"alter system set neon.safekeepers='{safekeepers}'")
promo_cur.execute("select pg_reload_conf()")
@@ -131,21 +131,35 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
lsn_triple = get_lsn_triple(new_primary_cur)
log.info(f"Secondary: LSN after workload is {lsn_triple}")
expected_promoted_lsn = Lsn(lsn_triple[2])
expected_lsn = Lsn(lsn_triple[2])
with secondary.connect() as conn, conn.cursor() as new_primary_cur:
new_primary_cur.execute("select payload from t")
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
if method == PromoteMethod.COMPUTE_CTL:
# compute_ctl's /promote switches replica type to Primary so it syncs
# safekeepers on finish
stop_and_check_lsn(secondary, expected_promoted_lsn)
# compute_ctl's /promote switches replica type to Primary so it syncs safekeepers on finish
stop_and_check_lsn(secondary, expected_lsn)
else:
# on testing postgres, we don't update replica type, secondaries don't
# sync so lsn should be None
# on testing postgres, we don't update replica type, secondaries don't sync so lsn should be None
stop_and_check_lsn(secondary, None)
if method == PromoteMethod.COMPUTE_CTL:
secondary.stop()
# In production, compute ultimately receives new compute spec from cplane.
secondary.respec(mode="Primary")
secondary.start()
with secondary.connect() as conn, conn.cursor() as new_primary_cur:
new_primary_cur.execute(
"INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload"
)
assert new_primary_cur.fetchall() == [(it,) for it in range(101, 201)]
lsn_triple = get_lsn_triple(new_primary_cur)
log.info(f"Secondary: LSN after restart and workload is {lsn_triple}")
expected_lsn = Lsn(lsn_triple[2])
stop_and_check_lsn(secondary, expected_lsn)
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary2")
with primary.connect() as new_primary, new_primary.cursor() as new_primary_cur:
@@ -154,10 +168,11 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
log.info(f"New primary: Boot LSN is {lsn_triple}")
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (200,)
compute_ctl_count = 100 * (method == PromoteMethod.COMPUTE_CTL)
assert new_primary_cur.fetchone() == (200 + compute_ctl_count,)
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(201, 300)")
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (300,)
assert new_primary_cur.fetchone() == (300 + compute_ctl_count,)
stop_and_check_lsn(primary, expected_primary_lsn)
@@ -175,18 +190,91 @@ def test_replica_promote_handler_disconnects(neon_simple_env: NeonEnv):
cur.execute("create schema neon;create extension neon with schema neon")
cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)")
cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
cur.execute("show neon.safekeepers")
safekeepers = cur.fetchall()[0][0]
primary.http_client().offload_lfc()
primary_spec = primary.get_compute_spec()
primary_endpoint_id = primary.endpoint_id
primary.stop(mode="immediate-terminate")
assert (lsn := primary.terminate_flush_lsn)
client = secondary.http_client()
client.prewarm_lfc(primary_endpoint_id)
safekeepers_lsn = {"safekeepers": safekeepers, "wal_flush_lsn": lsn}
assert client.promote(safekeepers_lsn, disconnect=True)["status"] == "completed"
promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)}
assert client.promote(promote_spec, disconnect=True)["status"] == "completed"
with secondary.connect() as conn, conn.cursor() as cur:
cur.execute("select count(*) from t")
assert cur.fetchone() == (100,)
cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload")
cur.execute("select count(*) from t")
assert cur.fetchone() == (200,)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_replica_promote_fails(neon_simple_env: NeonEnv):
"""
Test that if a /promote route fails, we can safely start primary back
"""
env: NeonEnv = neon_simple_env
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary.stop()
secondary.start(env={"FAILPOINTS": "compute-promotion=return(0)"})
with primary.connect() as conn, conn.cursor() as cur:
cur.execute("create schema neon;create extension neon with schema neon")
cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)")
cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
primary.http_client().offload_lfc()
primary_spec = primary.get_compute_spec()
primary_endpoint_id = primary.endpoint_id
primary.stop(mode="immediate-terminate")
assert (lsn := primary.terminate_flush_lsn)
client = secondary.http_client()
client.prewarm_lfc(primary_endpoint_id)
promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)}
assert client.promote(promote_spec)["status"] == "failed"
secondary.stop()
primary.start()
with primary.connect() as conn, conn.cursor() as cur:
cur.execute("select count(*) from t")
assert cur.fetchone() == (100,)
cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload")
cur.execute("select count(*) from t")
assert cur.fetchone() == (200,)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_replica_promote_prewarm_fails(neon_simple_env: NeonEnv):
"""
Test that if /lfc/prewarm route fails, we are able to promote
"""
env: NeonEnv = neon_simple_env
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary.stop()
secondary.start(env={"FAILPOINTS": "compute-prewarm=return(0)"})
with primary.connect() as conn, conn.cursor() as cur:
cur.execute("create schema neon;create extension neon with schema neon")
cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)")
cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
primary.http_client().offload_lfc()
primary_spec = primary.get_compute_spec()
primary_endpoint_id = primary.endpoint_id
primary.stop(mode="immediate-terminate")
assert (lsn := primary.terminate_flush_lsn)
client = secondary.http_client()
with pytest.raises(AssertionError):
client.prewarm_lfc(primary_endpoint_id)
assert client.prewarm_lfc_status()["status"] == "failed"
promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)}
assert client.promote(promote_spec)["status"] == "completed"
with secondary.connect() as conn, conn.cursor() as cur:
cur.execute("select count(*) from t")

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import re
from typing import TYPE_CHECKING
import pytest
@@ -12,7 +13,7 @@ if TYPE_CHECKING:
# TODO(diko): pageserver spams with various errors during safekeeper migration.
# Fix the code so it handles the migration better.
ALLOWED_PAGESERVER_ERRORS = [
PAGESERVER_ALLOWED_ERRORS = [
".*Timeline .* was cancelled and cannot be used anymore.*",
".*Timeline .* has been deleted.*",
".*Timeline .* was not found in global map.*",
@@ -35,7 +36,7 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
"timeline_safekeeper_count": 1,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS)
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
@@ -136,7 +137,7 @@ def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBui
"timeline_safekeeper_count": 3,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS)
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert len(mconf["sk_set"]) == 3
@@ -196,3 +197,92 @@ def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBui
assert (
f"timeline {env.initial_tenant}/{env.initial_timeline} deleted" in exc.value.response.text
)
def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper respects generations:
1. Check that migration back and forth between two safekeepers works.
2. Check that sk refuses to execute requests with stale generation.
"""
neon_env_builder.num_safekeepers = 3
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 1,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["new_sk_set"] is None
assert len(mconf["sk_set"]) == 1
cur_sk = mconf["sk_set"][0]
second_sk, third_sk = [sk.id for sk in env.safekeepers if sk.id != cur_sk]
cur_gen = 1
# Pull the timeline manually to third_sk, so the timeline exists there with stale generation.
# This is needed for the test later.
env.get_safekeeper(third_sk).pull_timeline(
[env.get_safekeeper(cur_sk)], env.initial_tenant, env.initial_timeline
)
def expect_deleted(sk_id: int):
with pytest.raises(requests.exceptions.HTTPError, match="Not Found") as exc:
env.get_safekeeper(sk_id).http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
assert exc.value.response.status_code == 404
assert re.match(r".*timeline .* deleted.*", exc.value.response.text)
def get_mconf(sk_id: int):
status = (
env.get_safekeeper(sk_id)
.http_client()
.timeline_status(env.initial_tenant, env.initial_timeline)
)
assert status.mconf is not None
return status.mconf
def migrate():
nonlocal cur_sk, second_sk, cur_gen
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [second_sk]
)
cur_sk, second_sk = second_sk, cur_sk
cur_gen += 2
# Migrate the timeline back and forth between cur_sk and second_sk.
for _i in range(3):
migrate()
# Timeline should exist on cur_sk.
assert get_mconf(cur_sk).generation == cur_gen
# Timeline should be deleted on second_sk.
expect_deleted(second_sk)
# Remember current mconf.
mconf = get_mconf(cur_sk)
# Migrate the timeline one more time.
# It increases the generation by 2.
migrate()
# Check that sk refuses to execute the exclude request with the old mconf.
with pytest.raises(requests.exceptions.HTTPError, match="Conflict") as exc:
env.get_safekeeper(cur_sk).http_client().timeline_exclude(
env.initial_tenant, env.initial_timeline, mconf
)
assert re.match(r".*refused to switch into excluding mconf.*", exc.value.response.text)
# We shouldn't have deleted the timeline.
assert get_mconf(cur_sk).generation == cur_gen
# Check that sk refuses to execute the pull_timeline request with the old mconf.
# Note: we try to pull from third_sk, which has a timeline with stale generation.
# Thus, we bypass some preliminary generation checks and actually test tombstones.
with pytest.raises(requests.exceptions.HTTPError, match="Conflict") as exc:
env.get_safekeeper(second_sk).pull_timeline(
[env.get_safekeeper(third_sk)], env.initial_tenant, env.initial_timeline, mconf
)
assert re.match(r".*Timeline .* deleted.*", exc.value.response.text)
# The timeline should remain deleted.
expect_deleted(second_sk)

View File

@@ -1508,20 +1508,55 @@ def test_sharding_split_failures(
env.storage_controller.consistency_check()
@pytest.mark.skip(reason="The backpressure change has not been merged yet.")
# HADRON
def test_create_tenant_after_split(neon_env_builder: NeonEnvBuilder):
"""
Tests creating a tenant and a timeline should fail after a tenant split.
"""
env = neon_env_builder.init_start(initial_tenant_shard_count=4)
env.storage_controller.allowed_errors.extend(
[
".*already exists with a different shard count.*",
]
)
ep = env.endpoints.create_start("main", tenant_id=env.initial_tenant)
ep.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);")
ep.safe_psql("INSERT INTO usertable VALUES (1, 'test1');")
ep.safe_psql("INSERT INTO usertable VALUES (2, 'test2');")
ep.safe_psql("INSERT INTO usertable VALUES (3, 'test3');")
# Split the tenant
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=8)
with pytest.raises(RuntimeError):
env.create_tenant(env.initial_tenant, env.initial_timeline, shard_count=4)
# run more queries
ep.safe_psql("SELECT * FROM usertable;")
ep.safe_psql("UPDATE usertable set FIELD0 = 'test4';")
ep.stop_and_destroy()
# HADRON
def test_back_pressure_during_split(neon_env_builder: NeonEnvBuilder):
"""
Test backpressure can ignore new shards during tenant split so that if we abort the split,
PG can continue without being blocked.
Test backpressure works correctly during a shard split, especially after a split is aborted,
PG will not be stuck forever.
"""
DBNAME = "regression"
init_shard_count = 4
init_shard_count = 1
neon_env_builder.num_pageservers = init_shard_count
stripe_size = 32
env = neon_env_builder.init_start(
initial_tenant_shard_count=init_shard_count, initial_tenant_shard_stripe_size=stripe_size
initial_tenant_shard_count=init_shard_count,
initial_tenant_shard_stripe_size=stripe_size,
initial_tenant_conf={
"checkpoint_distance": 1024 * 1024 * 1024,
},
)
env.storage_controller.allowed_errors.extend(
@@ -1537,19 +1572,31 @@ def test_back_pressure_during_split(neon_env_builder: NeonEnvBuilder):
"main",
config_lines=[
"max_replication_write_lag = 1MB",
"databricks.max_wal_mb_per_second = 1",
"neon.max_cluster_size = 10GB",
"databricks.max_wal_mb_per_second=100",
],
)
endpoint.respec(skip_pg_catalog_updates=False) # Needed for databricks_system to get created.
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
endpoint.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);")
# generate 10MB of data
endpoint.safe_psql(
"CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 10000) s;"
)
write_done = Event()
def write_data(write_done):
def get_write_lag():
res = endpoint.safe_psql(
"""
SELECT
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag
FROM neon.backpressure_lsns();
""",
log_query=False,
)
return res[0][0]
def write_data(write_done: Event):
while not write_done.is_set():
endpoint.safe_psql(
"INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False
@@ -1560,35 +1607,39 @@ def test_back_pressure_during_split(neon_env_builder: NeonEnvBuilder):
writer_thread.start()
env.storage_controller.configure_failpoints(("shard-split-pre-complete", "return(1)"))
# sleep 10 seconds before re-activating the old shard when aborting the split.
# this is to add some backpressures to PG
env.pageservers[0].http_client().configure_failpoints(
("attach-before-activate-sleep", "return(10000)"),
)
# split the tenant
with pytest.raises(StorageControllerApiException):
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=16)
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=4)
def check_tenant_status():
status = (
env.pageservers[0].http_client().tenant_status(TenantShardId(env.initial_tenant, 0, 1))
)
assert status["state"]["slug"] == "Active"
wait_until(check_tenant_status)
write_done.set()
writer_thread.join()
log.info(f"current write lag: {get_write_lag()}")
# writing more data to page servers after split is aborted
for _i in range(5000):
endpoint.safe_psql(
"INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False
)
with endpoint.cursor() as cur:
for _i in range(1000):
cur.execute("INSERT INTO usertable SELECT random(), repeat('a', 1000);")
# wait until write lag becomes 0
def check_write_lag_is_zero():
res = endpoint.safe_psql(
"""
SELECT
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag
FROM neon.backpressure_lsns();
""",
dbname="databricks_system",
log_query=False,
)
log.info(f"received_lsn_lag = {res[0][0]}")
assert res[0][0] == 0
res = get_write_lag()
assert res == 0
wait_until(check_write_lag_is_zero)
endpoint.stop_and_destroy()
# BEGIN_HADRON
@@ -1674,7 +1725,6 @@ def test_shard_resolve_during_split_abort(neon_env_builder: NeonEnvBuilder):
# HADRON
@pytest.mark.skip(reason="The backpressure change has not been merged yet.")
def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder):
"""
Tests back pressure knobs are enforced on the per shard basis instead of at the tenant level.
@@ -1703,20 +1753,16 @@ def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder):
"neon.max_cluster_size = 10GB",
],
)
endpoint.respec(skip_pg_catalog_updates=False) # Needed for databricks_system to get created.
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
# generate 20MB of data
# generate 10MB of data
endpoint.safe_psql(
"CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;"
"CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 10000) s;"
)
res = endpoint.safe_psql(
"SELECT neon.backpressure_throttling_time() as throttling_time", dbname="databricks_system"
)[0]
res = endpoint.safe_psql("SELECT neon.backpressure_throttling_time() as throttling_time")[0]
assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}"
endpoint.stop()
# HADRON
def test_shard_split_page_server_timeout(neon_env_builder: NeonEnvBuilder):
@@ -1880,14 +1926,14 @@ def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder):
shards_info()
for _write_iter in range(30):
# approximately 1MB of data
workload.write_rows(8000, upload=False)
# approximately 10MB of data
workload.write_rows(80000, upload=False)
update_write_lsn()
infos = shards_info()
min_lsn = min(Lsn(info["last_record_lsn"]) for info in infos)
max_lsn = max(Lsn(info["last_record_lsn"]) for info in infos)
diff = max_lsn - min_lsn
assert diff < 2 * 1024 * 1024, f"LSN diff={diff}, expected diff < 2MB due to backpressure"
assert diff < 8 * 1024 * 1024, f"LSN diff={diff}, expected diff < 8MB due to backpressure"
def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):