Compare commits

..

15 Commits

Author SHA1 Message Date
Yuchen Liang
db378d1a5f Merge branch 'main' into yuchen/lsn-leases-poc 2024-06-17 14:01:39 -04:00
Yuchen Liang
0e897e8bf2 expose duration as a param for ; catch invalid(gc-ed) lsn lease request
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-17 14:01:08 -04:00
Yuchen Liang
b0b4ea609e fix comparison to take max lsn with valid lease; fix tests
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-17 13:43:04 -04:00
Alexander Bayandin
b6e1c09c73 CI(check-build-tools-image): change build-tools image persistent tag (#8059)
## Problem

We don't rebuild `build-tools` image for changes in a workflow that
builds this image itself
(`.github/workflows/build-build-tools-image.yml`) or in a workflow that
determines which tag to use
(`.github/workflows/check-build-tools-image.yml`)

## Summary of changes
- Use a hash of `Dockerfile.build-tools` and workflow files as a
persistent tag instead of using a commit sha.
2024-06-17 12:47:20 +01:00
Yuchen Liang
be824220bb Merge branch 'main' into yuchen/lsn-leases-poc 2024-06-14 15:10:23 -04:00
Yuchen Liang
723ea86f40 make lease more robust
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-14 15:03:22 -04:00
Yuchen Liang
da55eebc83 use BTreeMap::retain
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-13 12:59:05 -04:00
Yuchen Liang
daffd5b998 update default lsn lease length
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-13 12:51:51 -04:00
Yuchen Liang
9a7d6a7526 Merge branch 'main' into yuchen/lsn-leases-poc 2024-06-13 09:03:21 -04:00
Yuchen Liang
9b25a4e5d2 Merge branch 'main' into yuchen/lsn-leases-poc 2024-06-13 08:25:03 -04:00
Yuchen Liang
be8f200815 setup demo unit test
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-12 12:27:06 -04:00
Yuchen Liang
a5c8f94165 release mutex guard early in test
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-12 08:56:40 -04:00
Yuchen Liang
f0a0515d4f fix lsn comparison logic
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-12 08:56:40 -04:00
Yuchen Liang
a7ca1d60b4 fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-12 08:56:40 -04:00
Yuchen Liang
e834839255 DNM/Poc: use lsn leases to temporarily block gc
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-12 08:56:40 -04:00
16 changed files with 266 additions and 160 deletions

View File

@@ -30,7 +30,6 @@ jobs:
check-image:
uses: ./.github/workflows/check-build-tools-image.yml
# This job uses older version of GitHub Actions because it's run on gen2 runners, which don't support node 20 (for newer versions)
build-image:
needs: [ check-image ]
if: needs.check-image.outputs.found == 'false'

View File

@@ -25,26 +25,17 @@ jobs:
found: ${{ steps.check-image.outputs.found }}
steps:
- uses: actions/checkout@v4
- name: Get build-tools image tag for the current commit
id: get-build-tools-tag
env:
# Usually, for COMMIT_SHA, we use `github.event.pull_request.head.sha || github.sha`, but here, even for PRs,
# we want to use `github.sha` i.e. point to a phantom merge commit to determine the image tag correctly.
COMMIT_SHA: ${{ github.sha }}
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
IMAGE_TAG: |
${{ hashFiles('Dockerfile.build-tools',
'.github/workflows/check-build-tools-image.yml',
'.github/workflows/build-build-tools-image.yml') }}
run: |
LAST_BUILD_TOOLS_SHA=$(
gh api \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
--method GET \
--field path=Dockerfile.build-tools \
--field sha=${COMMIT_SHA} \
--field per_page=1 \
--jq ".[0].sha" \
"/repos/${GITHUB_REPOSITORY}/commits"
)
echo "image-tag=${LAST_BUILD_TOOLS_SHA}" | tee -a $GITHUB_OUTPUT
echo "image-tag=${IMAGE_TAG}" | tee -a $GITHUB_OUTPUT
- name: Check if such tag found in the registry
id: check-image

8
Cargo.lock generated
View File

@@ -4002,7 +4002,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=skip-auth-1rtt#42784ef44fe62b6edca9813ca47bfc1c52c60a73"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4015,7 +4015,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=skip-auth-1rtt#42784ef44fe62b6edca9813ca47bfc1c52c60a73"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -4034,7 +4034,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=skip-auth-1rtt#42784ef44fe62b6edca9813ca47bfc1c52c60a73"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -6204,7 +6204,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=skip-auth-1rtt#42784ef44fe62b6edca9813ca47bfc1c52c60a73"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"async-trait",
"byteorder",

View File

@@ -198,10 +198,10 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="skip-auth-1rtt" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="skip-auth-1rtt" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="skip-auth-1rtt" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="skip-auth-1rtt" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -240,7 +240,7 @@ tonic-build = "0.9"
[patch.crates-io]
# Needed to get `tokio-postgres-rustls` to depend on our fork.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="skip-auth-1rtt" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
# bug fixes for UUID
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master" }

View File

@@ -177,6 +177,15 @@ serde_with::serde_conv!(
|value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
);
impl LsnLease {
pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60);
/// Checks whether the lease is expired.
pub fn is_expired(&self, now: &SystemTime) -> bool {
now > &self.valid_until
}
}
/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ActivatingFrom {

View File

@@ -144,7 +144,20 @@ impl PgConnectionConfig {
// implement and this function is hardly a bottleneck. The function is only called around
// establishing a new connection.
#[allow(unstable_name_collisions)]
config.options(&encode_options(&self.options));
config.options(
&self
.options
.iter()
.map(|s| {
if s.contains(['\\', ' ']) {
Cow::Owned(s.replace('\\', "\\\\").replace(' ', "\\ "))
} else {
Cow::Borrowed(s.as_str())
}
})
.intersperse(Cow::Borrowed(" ")) // TODO: use impl from std once it's stabilized
.collect::<String>(),
);
}
config
}
@@ -165,21 +178,6 @@ impl PgConnectionConfig {
}
}
#[allow(unstable_name_collisions)]
fn encode_options(options: &[String]) -> String {
options
.iter()
.map(|s| {
if s.contains(['\\', ' ']) {
Cow::Owned(s.replace('\\', "\\\\").replace(' ', "\\ "))
} else {
Cow::Borrowed(s.as_str())
}
})
.intersperse(Cow::Borrowed(" ")) // TODO: use impl from std once it's stabilized
.collect::<String>()
}
impl fmt::Display for PgConnectionConfig {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// The password is intentionally hidden and not part of this display string.
@@ -208,7 +206,7 @@ impl fmt::Debug for PgConnectionConfig {
#[cfg(test)]
mod tests_pg_connection_config {
use crate::{encode_options, PgConnectionConfig};
use crate::PgConnectionConfig;
use once_cell::sync::Lazy;
use url::Host;
@@ -257,12 +255,18 @@ mod tests_pg_connection_config {
#[test]
fn test_with_options() {
let options = encode_options(&[
"hello".to_owned(),
"world".to_owned(),
"with space".to_owned(),
"and \\ backslashes".to_owned(),
let cfg = PgConnectionConfig::new_host_port(STUB_HOST.clone(), 123).extend_options([
"hello",
"world",
"with space",
"and \\ backslashes",
]);
assert_eq!(options, "hello world with\\ space and\\ \\\\\\ backslashes");
assert_eq!(cfg.host(), &*STUB_HOST);
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "stub.host.example:123");
assert_eq!(
cfg.to_tokio_postgres_config().get_options(),
Some("hello world with\\ space and\\ \\\\\\ backslashes")
);
}
}

View File

@@ -21,6 +21,7 @@ use pageserver_api::models::IngestAuxFilesRequest;
use pageserver_api::models::ListAuxFilesRequest;
use pageserver_api::models::LocationConfig;
use pageserver_api::models::LocationConfigListResponse;
use pageserver_api::models::LsnLease;
use pageserver_api::models::ShardParameters;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::TenantLocationConfigResponse;
@@ -1730,7 +1731,7 @@ async fn lsn_lease_handler(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let result = timeline
.make_lsn_lease(lsn, &ctx)
.make_lsn_lease(lsn, LsnLease::DEFAULT_LENGTH, &ctx)
.map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?;
json_response(StatusCode::OK, result)

View File

@@ -9,6 +9,7 @@ use futures::stream::FuturesUnordered;
use futures::Stream;
use futures::StreamExt;
use pageserver_api::key::Key;
use pageserver_api::models::LsnLease;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
@@ -935,7 +936,7 @@ impl PageServerHandler {
let timeline = self
.get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector)
.await?;
let lease = timeline.make_lsn_lease(lsn, ctx)?;
let lease = timeline.make_lsn_lease(lsn, LsnLease::DEFAULT_LENGTH, ctx)?;
let valid_until = lease
.valid_until
.duration_since(SystemTime::UNIX_EPOCH)

View File

@@ -240,6 +240,7 @@ pub struct GcResult {
pub layers_needed_by_cutoff: u64,
pub layers_needed_by_pitr: u64,
pub layers_needed_by_branches: u64,
pub layers_needed_by_leases: u64,
pub layers_not_updated: u64,
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
@@ -269,6 +270,7 @@ impl AddAssign for GcResult {
self.layers_needed_by_pitr += other.layers_needed_by_pitr;
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
self.layers_needed_by_branches += other.layers_needed_by_branches;
self.layers_needed_by_leases += other.layers_needed_by_leases;
self.layers_not_updated += other.layers_not_updated;
self.layers_removed += other.layers_removed;

View File

@@ -31,6 +31,7 @@ use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::fmt;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
use tokio::sync::watch;
@@ -65,9 +66,9 @@ use self::timeline::uninit::TimelineCreateGuard;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use self::timeline::GcCutoffs;
use self::timeline::TimelineResources;
use self::timeline::WaitLsnError;
use self::timeline::{GcCutoffs, GcInfo};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
@@ -3010,12 +3011,13 @@ impl Tenant {
{
let mut target = timeline.gc_info.write().unwrap();
let now = SystemTime::now();
target.leases.retain(|_, lease| !lease.is_expired(&now));
match gc_cutoffs.remove(&timeline.timeline_id) {
Some(cutoffs) => {
*target = GcInfo {
retain_lsns: branchpoints,
cutoffs,
};
target.retain_lsns = branchpoints;
target.cutoffs = cutoffs;
}
None => {
// reasons for this being unavailable:
@@ -4048,6 +4050,7 @@ mod tests {
use bytes::{Bytes, BytesMut};
use hex_literal::hex;
use itertools::Itertools;
use models::LsnLease;
use pageserver_api::key::{AUX_FILES_KEY, AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
@@ -4261,6 +4264,66 @@ mod tests {
tline.freeze_and_flush().await.map_err(|e| e.into())
}
#[tokio::test]
async fn test_lsn_leases() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_lsn_leases")?.load().await;
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_lsn = Lsn(0x100);
let image_layers = (0x20..=0x90)
.step_by(0x10)
.map(|n| {
(
Lsn(n),
vec![(key, test_img(&format!("data key at {:x}", n)))],
)
})
.collect();
let timeline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
Vec::new(),
image_layers,
end_lsn,
)
.await?;
let leased_lsns = [0x30, 0x50, 0x70];
let _: anyhow::Result<_> = leased_lsns.iter().try_for_each(|n| {
let _ = timeline.make_lsn_lease(Lsn(*n), LsnLease::DEFAULT_LENGTH, &ctx)?;
Ok(())
});
// Force set disk consistent lsn so we can get the cutoff at `end_lsn`.
timeline.force_set_disk_consistent_lsn(end_lsn);
let res = tenant
.gc_iteration(
Some(TIMELINE_ID),
0,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
// Keeping everything <= Lsn(0x80) b/c leases:
// 0/10: initdb layer
// (0/20..=0/70).step_by(0x10): image layers added when creating the timeline.
assert_eq!(res.layers_needed_by_leases, 7);
// Keeping 0/90 b/c it is the latest layer.
assert_eq!(res.layers_not_updated, 1);
// Removed 0/80.
assert_eq!(res.layers_removed, 1);
Ok(())
}
#[tokio::test]
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let (tenant, ctx) =

View File

@@ -14,6 +14,7 @@ use crate::tenant::config::defaults::DEFAULT_COMPACTION_PERIOD;
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use pageserver_api::models::LsnLease;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -346,6 +347,10 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// cutoff specified as time.
let ctx =
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
// Delay GC by default lease period at pageserver restart.
tokio::time::sleep(LsnLease::DEFAULT_LENGTH).await;
let mut first = true;
loop {
tokio::select! {

View File

@@ -454,6 +454,12 @@ pub(crate) struct GcInfo {
/// The cutoff coordinates, which are combined by selecting the minimum.
pub(crate) cutoffs: GcCutoffs,
// TODO(yuchen): If we decide to incorporate `retain_lsns` into the same structure,
// this would look something like `BTreeMap<Lsn, Option<LsnLease>>`, where
// `None` suggests an infinite lease (will be used by current retain_lsns).
/// Leases granted to particular LSNs.
pub(crate) leases: BTreeMap<Lsn, LsnLease>,
}
impl GcInfo {
@@ -1558,14 +1564,34 @@ impl Timeline {
/// Obtains a temporary lease blocking garbage collection for the given LSN
pub(crate) fn make_lsn_lease(
&self,
_lsn: Lsn,
lsn: Lsn,
length: Duration,
_ctx: &RequestContext,
) -> anyhow::Result<LsnLease> {
const LEASE_LENGTH: Duration = Duration::from_secs(5 * 60);
let lease = LsnLease {
valid_until: SystemTime::now() + LEASE_LENGTH,
let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!("tried to request a page version that was garbage collected. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn);
}
let mut lease = LsnLease {
valid_until: SystemTime::now() + length,
};
// TODO: dummy implementation
{
let mut gc_info = self.gc_info.write().unwrap();
gc_info
.leases
.entry(lsn)
.and_modify(|existing| {
// Insert a lease only if it extends longer than the existing one.
if lease.valid_until > existing.valid_until {
*existing = lease.clone();
} else {
lease = existing.clone();
}
})
.or_insert(lease.clone());
}
Ok(lease)
}
@@ -4907,13 +4933,23 @@ impl Timeline {
return Err(GcError::TimelineCancelled);
}
let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
let (horizon_cutoff, pitr_cutoff, retain_lsns, max_lsn_with_valid_lease) = {
let gc_info = self.gc_info.read().unwrap();
let horizon_cutoff = min(gc_info.cutoffs.horizon, self.get_disk_consistent_lsn());
let pitr_cutoff = gc_info.cutoffs.pitr;
let retain_lsns = gc_info.retain_lsns.clone();
(horizon_cutoff, pitr_cutoff, retain_lsns)
// Gets the minimum LSN that holds the valid lease.
// Caveat: This value could be stale since we rely on refresh_gc_info to invalidate leases,
// so there could be leases invalidated between the refresh and here.
let max_lsn_with_valid_lease = gc_info.leases.last_key_value().map(|(lsn, _)| *lsn);
(
horizon_cutoff,
pitr_cutoff,
retain_lsns,
max_lsn_with_valid_lease,
)
};
let mut new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
@@ -4944,7 +4980,13 @@ impl Timeline {
.set(Lsn::INVALID.0 as i64);
let res = self
.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
.gc_timeline(
horizon_cutoff,
pitr_cutoff,
retain_lsns,
new_gc_cutoff,
max_lsn_with_valid_lease,
)
.instrument(
info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
)
@@ -4962,6 +5004,7 @@ impl Timeline {
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,
new_gc_cutoff: Lsn,
max_lsn_with_valid_lease: Option<Lsn>,
) -> Result<GcResult, GcError> {
// FIXME: if there is an ongoing detach_from_ancestor, we should just skip gc
@@ -5009,7 +5052,8 @@ impl Timeline {
// 1. it is older than cutoff LSN;
// 2. it is older than PITR interval;
// 3. it doesn't need to be retained for 'retain_lsns';
// 4. newer on-disk image layers cover the layer's whole key range
// 4. it does not need to be kept for LSNs holding valid leases (logic is very similar to retain_lsns);
// 5. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut guard = self.layers.write().await;
@@ -5060,7 +5104,20 @@ impl Timeline {
}
}
// 4. Is there a later on-disk layer for this relation?
// 4. Is there a valid lease that requires us to keep this layer?
if let Some(lsn) = &max_lsn_with_valid_lease {
if &l.get_lsn_range().start <= lsn {
debug!(
"keeping {} because there is a valid lease preventing GC at {}",
l.layer_name(),
lsn,
);
result.layers_needed_by_leases += 1;
continue 'outer;
}
}
// 5. Is there a later on-disk layer for this relation?
//
// The end-LSN is exclusive, while disk_consistent_lsn is
// inclusive. For example, if disk_consistent_lsn is 100, it is
@@ -5082,13 +5139,13 @@ impl Timeline {
if !layers
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
{
debug!("keeping {} because it is the latest layer", l.layer_name());
info!("keeping {} because it is the latest layer", l.layer_name());
result.layers_not_updated += 1;
continue 'outer;
}
// We didn't find any reason to keep this file, so remove it.
debug!(
info!(
"garbage collecting {} is_dropped: xx is_incremental: {}",
l.layer_name(),
l.is_incremental(),
@@ -5438,6 +5495,11 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}
#[cfg(test)]
pub(super) fn force_set_disk_consistent_lsn(&self, new_value: Lsn) {
self.disk_consistent_lsn.store(new_value);
}
/// Force create an image layer and place it into the layer map.
///
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]

View File

@@ -103,8 +103,12 @@ impl ConnCfg {
/// Reuse password or auth keys from the other config.
pub fn reuse_password(&mut self, other: Self) {
if let Some(password) = other.get_auth() {
self.auth(password);
if let Some(password) = other.get_password() {
self.password(password);
}
if let Some(keys) = other.get_auth_keys() {
self.auth_keys(keys);
}
}
@@ -120,64 +124,48 @@ impl ConnCfg {
/// Apply startup message params to the connection config.
pub fn set_startup_params(&mut self, params: &StartupMessageParams) {
let mut client_encoding = false;
for (k, v) in params.iter() {
match k {
"user" => {
// Only set `user` if it's not present in the config.
// Link auth flow takes username from the console's response.
if self.get_user().is_none() {
self.user(v);
}
// Only set `user` if it's not present in the config.
// Link auth flow takes username from the console's response.
if let (None, Some(user)) = (self.get_user(), params.get("user")) {
self.user(user);
}
// Only set `dbname` if it's not present in the config.
// Link auth flow takes dbname from the console's response.
if let (None, Some(dbname)) = (self.get_dbname(), params.get("database")) {
self.dbname(dbname);
}
// Don't add `options` if they were only used for specifying a project.
// Connection pools don't support `options`, because they affect backend startup.
if let Some(options) = filtered_options(params) {
self.options(&options);
}
if let Some(app_name) = params.get("application_name") {
self.application_name(app_name);
}
// TODO: This is especially ugly...
if let Some(replication) = params.get("replication") {
use tokio_postgres::config::ReplicationMode;
match replication {
"true" | "on" | "yes" | "1" => {
self.replication_mode(ReplicationMode::Physical);
}
"database" => {
// Only set `dbname` if it's not present in the config.
// Link auth flow takes dbname from the console's response.
if self.get_dbname().is_none() {
self.dbname(v);
}
}
"options" => {
// Don't add `options` if they were only used for specifying a project.
// Connection pools don't support `options`, because they affect backend startup.
if let Some(options) = filtered_options(v) {
self.options(&options);
}
}
// the special ones in tokio-postgres that we don't want being set by the user
"dbname" => {}
"password" => {}
"sslmode" => {}
"host" => {}
"port" => {}
"connect_timeout" => {}
"keepalives" => {}
"keepalives_idle" => {}
"keepalives_interval" => {}
"keepalives_retries" => {}
"target_session_attrs" => {}
"channel_binding" => {}
"max_backend_message_size" => {}
"client_encoding" => {
client_encoding = true;
// only error should be from bad null bytes,
// but we've already checked for those.
_ = self.param("client_encoding", v);
}
_ => {
// only error should be from bad null bytes,
// but we've already checked for those.
_ = self.param(k, v);
self.replication_mode(ReplicationMode::Logical);
}
_other => {}
}
}
if !client_encoding {
// for compatibility since we removed it from tokio-postgres
self.param("client_encoding", "UTF8").unwrap();
}
// TODO: extend the list of the forwarded startup parameters.
// Currently, tokio-postgres doesn't allow us to pass
// arbitrary parameters, but the ones above are a good start.
//
// This and the reverse params problem can be better addressed
// in a bespoke connection machinery (a new library for that sake).
}
}
@@ -350,9 +338,10 @@ impl ConnCfg {
}
/// Retrieve `options` from a startup message, dropping all proxy-secific flags.
fn filtered_options(options: &str) -> Option<String> {
fn filtered_options(params: &StartupMessageParams) -> Option<String> {
#[allow(unstable_name_collisions)]
let options: String = StartupMessageParams::parse_options_raw(options)
let options: String = params
.options_raw()?
.filter(|opt| parse_endpoint_param(opt).is_none() && neon_option(opt).is_none())
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();
@@ -424,23 +413,27 @@ mod tests {
#[test]
fn test_filtered_options() {
// Empty options is unlikely to be useful anyway.
assert_eq!(filtered_options(""), None);
let params = StartupMessageParams::new([("options", "")]);
assert_eq!(filtered_options(&params), None);
// It's likely that clients will only use options to specify endpoint/project.
let params = "project=foo";
assert_eq!(filtered_options(params), None);
let params = StartupMessageParams::new([("options", "project=foo")]);
assert_eq!(filtered_options(&params), None);
// Same, because unescaped whitespaces are no-op.
let params = " project=foo ";
assert_eq!(filtered_options(params), None);
let params = StartupMessageParams::new([("options", " project=foo ")]);
assert_eq!(filtered_options(&params).as_deref(), None);
let params = r"\ project=foo \ ";
assert_eq!(filtered_options(params).as_deref(), Some(r"\ \ "));
let params = StartupMessageParams::new([("options", r"\ project=foo \ ")]);
assert_eq!(filtered_options(&params).as_deref(), Some(r"\ \ "));
let params = "project = foo";
assert_eq!(filtered_options(params).as_deref(), Some("project = foo"));
let params = StartupMessageParams::new([("options", "project = foo")]);
assert_eq!(filtered_options(&params).as_deref(), Some("project = foo"));
let params = "project = foo neon_endpoint_type:read_write neon_lsn:0/2";
assert_eq!(filtered_options(params).as_deref(), Some("project = foo"));
let params = StartupMessageParams::new([(
"options",
"project = foo neon_endpoint_type:read_write neon_lsn:0/2",
)]);
assert_eq!(filtered_options(&params).as_deref(), Some("project = foo"));
}
}

View File

@@ -231,10 +231,6 @@ impl ConnectMechanism for TokioMechanism {
.dbname(&self.conn_info.dbname)
.connect_timeout(timeout);
config
.param("client_encoding", "UTF8")
.expect("client encoding UTF8 is always valid");
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
let res = config.connect(tokio_postgres::NoTls).await;
drop(pause);

View File

@@ -202,7 +202,6 @@ fn get_conn_info(
options = Some(NeonOptions::parse_options_raw(&value));
}
}
ctx.set_db_options(params.freeze());
let user_info = ComputeUserInfo {
endpoint,

View File

@@ -53,25 +53,6 @@ def test_proxy_select_1(static_proxy: NeonProxy):
assert out[0][0] == 42
def test_proxy_server_params(static_proxy: NeonProxy):
"""
Test that server params are passing through to postgres
"""
out = static_proxy.safe_psql(
"select to_json('0 seconds'::interval)", options="-c intervalstyle=iso_8601"
)
assert out[0][0] == "PT0S"
out = static_proxy.safe_psql(
"select to_json('0 seconds'::interval)", options="-c intervalstyle=sql_standard"
)
assert out[0][0] == "0"
out = static_proxy.safe_psql(
"select to_json('0 seconds'::interval)", options="-c intervalstyle=postgres"
)
assert out[0][0] == "00:00:00"
def test_password_hack(static_proxy: NeonProxy):
"""
Check the PasswordHack auth flow: an alternative to SCRAM auth for