Compare commits

..

14 Commits

Author SHA1 Message Date
Conrad Ludgate
13d41b51a2 simplify error handling for query encoding 2025-05-21 20:07:43 +01:00
Conrad Ludgate
f3c9d0adf4 proxy(logging): significant changes to json logging internals for performance. (#11974)
#11962 

Please review each commit separately.

Each commit is rather small in goal. The overall goal of this PR is to
keep the behaviour identical, but shave away small inefficiencies here
and there.
2025-05-20 17:57:59 +00:00
Konstantin Knizhnik
2e3dc9a8c2 Add rel_size_replica_cache (#11889)
## Problem

See 
Discussion:
https://neondb.slack.com/archives/C033RQ5SPDH/p1746645666075799
Issue: https://github.com/neondatabase/cloud/issues/28609

Relation size cache is not correctly updated at PS in case of replicas.

## Summary of changes

1. Have two caches for relation size in timeline:
`rel_size_primary_cache` and `rel_size_replica_cache`.
2. `rel_size_primary_cache` is actually what we have now. The only
difference is that it is not updated in `get_rel_size`, only by WAL
ingestion
3. `rel_size_replica_cache` has limited size (LruCache) and it's key is
`(Lsn,RelTag)` . It is updated in `get_rel_size`. Only strict LSN
matches are accepted as cache hit.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-20 15:38:27 +00:00
Konstantin Merenkov
568779fa8a proxy/scram: avoid memory copy to improve performance (#11980)
Touches #11941

## Problem
Performance of our PBKDF2 was worse than reference.

## Summary of changes
Avoided memory copy when HMACing in a tight loop.
2025-05-20 15:23:54 +00:00
Alexey Kondratov
e94acbc816 fix(compute_ctl): Dollar escaping and tests (#11969)
## Problem

In the escaping path we were checking that `${tag}$` or `${outer_tag}$`
are present in the string, but that's not enough, as original string
surrounded by `$` can also form a 'tag', like `$x$xx$x$`, which is fine
on it's own, but cannot be used in the string escaped with `$xx$`.

## Summary of changes

Remove `$` from the checks, just check if `{tag}` or `{outer_tag}` are
present. Add more test cases and change the catalog test to stress the
`drop_subscriptions_before_start: true` path as well.

Fixes https://github.com/neondatabase/cloud/issues/29198
2025-05-20 09:03:36 +00:00
Erik Grinaker
f4150614d0 pageserver: don't pass config to PageHandler (#11973)
## Problem

The gRPC page service API will require decoupling the `PageHandler` from
the libpq protocol implementation. As preparation for this, avoid
passing in the entire server config to `PageHandler`, and instead
explicitly pass in the relevant fields.

Touches https://github.com/neondatabase/neon/issues/11728.

## Summary of changes

* Change `PageHandler` to take a `GetVectoredConcurrentIo` instead of
the entire config.
* Change `IoConcurrency::spawn_from_conf` to take a
`GetVectoredConcurrentIo`.
2025-05-19 15:47:40 +00:00
Erik Grinaker
38dbc5f67f pageserver/page_api: add binary Protobuf descriptor (#11968)
## Problem

A binary Protobuf schema descriptor can be used to expose an API
reflection service, which in turn allows convenient usage of e.g.
`grpcurl` against the gRPC server.

Touches #11728.

## Summary of changes

* Generate a binary schema descriptor as
`pageserver_page_api::proto::FILE_DESCRIPTOR_SET`.
* Opportunistically rename the Protobuf package from `page_service` to
`page_api`.
2025-05-19 11:17:45 +00:00
Folke Behrens
3685ad606d endpoint_storage: Fix metrics test by excluding assertion on macos (#11952) 2025-05-19 10:56:03 +00:00
Ivan Efremov
76a7d37f7e proxy: Drop cancellation ops if they don't fit into the queue (#11950)
Add a redis ops batch size argument for proxy and remove timeouts by
using try_send()
2025-05-19 10:10:55 +00:00
Erik Grinaker
cdb6479c8a pageserver: add gRPC page service schema (#11815)
## Problem

For the [communicator
project](https://github.com/neondatabase/company_projects/issues/352),
we want to move to gRPC for the page service protocol.

Touches #11728.

## Summary of changes

This patch adds an experimental gRPC Protobuf schema for the page
service. It is equivalent to the current page service, but with several
improvements, e.g.:

* Connection multiplexing.
* Reduced head-of-line blocking.
* Client-side batching.
* Explicit tenant shard routing.
* GetPage request classification (normal vs. prefetch).
* Explicit rate limiting ("slow down" response status).

The API is exposed as a new `pageserver/page_api` package. This is
separate from the `pageserver_api` package to reduce the dependency
footprint for the communicator. The longer-term plan is to also split
out e.g. the WAL ingestion service to a separate gRPC package, e.g.
`pageserver/wal_api`.

Subsequent PRs will: add Rust domain types for the Protobuf types,
expose a gRPC server, and implement the page service.

Preliminary prototype benchmarks of this gRPC API is within 10% of
baseline libpq performance. We'll do further benchmarking and
optimization as the implementation lands in `main` and is deployed to
staging.
2025-05-19 09:03:06 +00:00
Konstantin Knizhnik
81c557d87e Unlogged build get smgr (#11954)
## Problem

See https://github.com/neondatabase/neon/issues/11910
and https://neondb.slack.com/archives/C04DGM6SMTM/p1747314649059129

## Summary of changes

Do not change persistence in `start_unlogged_build`

Postgres PRs:
https://github.com/neondatabase/postgres/pull/642
https://github.com/neondatabase/postgres/pull/641
https://github.com/neondatabase/postgres/pull/640
https://github.com/neondatabase/postgres/pull/639

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-18 05:02:47 +00:00
Trung Dinh
e963129678 pagesteam_handle_batched_message -> pagestream_handle_batched_message (#11916)
## Problem
Found a typo in code.

## Summary of changes

Co-authored-by: Trung Dinh <tdinh@roblox.com>
Co-authored-by: Erik Grinaker <erik@neon.tech>
2025-05-17 22:30:29 +00:00
dependabot[bot]
4f0a9fc569 chore(deps): bump flask-cors from 5.0.0 to 6.0.0 in the pip group across 1 directory (#11960)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-05-17 22:06:32 +00:00
Emmanuel Ferdman
81c6a5a796 Migrate to correct logger interface (#11956)
## Problem
Currently the `logger` library throws annoying deprecation warnings:
```python
DeprecationWarning: The 'warn' method is deprecated, use 'warning' instead
```

## Summary of changes
This small PR resolves the annoying deprecation warnings by migrating to
`.warning` as suggested.

Signed-off-by: Emmanuel Ferdman <emmanuelferdman@gmail.com>
2025-05-17 21:12:01 +00:00
65 changed files with 1562 additions and 1216 deletions

12
Cargo.lock generated
View File

@@ -4286,6 +4286,7 @@ dependencies = [
"enumset",
"fail",
"futures",
"hashlink",
"hex",
"hex-literal",
"http-utils",
@@ -4434,6 +4435,16 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_page_api"
version = "0.1.0"
dependencies = [
"prost 0.13.3",
"tonic",
"tonic-build",
"workspace_hack",
]
[[package]]
name = "papaya"
version = "0.2.1"
@@ -5205,7 +5216,6 @@ dependencies = [
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"rustls-pemfile 2.1.1",
"ryu",
"scopeguard",
"serde",
"serde_json",

View File

@@ -9,6 +9,7 @@ members = [
"pageserver/ctl",
"pageserver/client",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",
"safekeeper",
"safekeeper/client",
@@ -252,6 +253,7 @@ pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }

View File

@@ -7,7 +7,7 @@ index 255e616..1c6edb7 100644
RelationGetRelationName(index));
+#ifdef NEON_SMGR
+ smgr_start_unlogged_build(index->rd_smgr);
+ smgr_start_unlogged_build(RelationGetSmgr(index));
+#endif
+
initRumState(&buildstate.rumstate, index);
@@ -18,7 +18,7 @@ index 255e616..1c6edb7 100644
rumUpdateStats(index, &buildstate.buildStats, buildstate.rumstate.isBuild);
+#ifdef NEON_SMGR
+ smgr_finish_unlogged_build_phase_1(index->rd_smgr);
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
+#endif
+
/*
@@ -29,7 +29,7 @@ index 255e616..1c6edb7 100644
}
+#ifdef NEON_SMGR
+ smgr_end_unlogged_build(index->rd_smgr);
+ smgr_end_unlogged_build(RelationGetSmgr(index));
+#endif
+
/*

View File

@@ -213,8 +213,10 @@ impl Escaping for PgIdent {
// Find the first suitable tag that is not present in the string.
// Postgres' max role/DB name length is 63 bytes, so even in the
// worst case it won't take long.
while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) {
// worst case it won't take long. Outer tag is always `tag + "x"`,
// so if `tag` is not present in the string, `outer_tag` is not
// present in the string either.
while self.contains(&tag.to_string()) {
tag += "x";
outer_tag = tag.clone() + "x";
}

View File

@@ -71,6 +71,14 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor
("name$$$", ("$x$name$$$$x$", "xx")),
("name$$$$", ("$x$name$$$$$x$", "xx")),
("name$x$", ("$xx$name$x$$xx$", "xxx")),
("x", ("$xx$x$xx$", "xxx")),
("xx", ("$xxx$xx$xxx$", "xxxx")),
("$x", ("$xx$$x$xx$", "xxx")),
("x$", ("$xx$x$$xx$", "xxx")),
("$x$", ("$xx$$x$$xx$", "xxx")),
("xx$", ("$xxx$xx$$xxx$", "xxxx")),
("$xx", ("$xxx$$xx$xxx$", "xxxx")),
("$xx$", ("$xxx$$xx$$xxx$", "xxxx")),
];
for (input, expected) in test_cases {

View File

@@ -546,6 +546,11 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("Falied to parse 'sampling_ratio'")?,
relsize_snapshot_cache_capacity: settings
.remove("relsize snapshot cache capacity")
.map(|x| x.parse::<usize>())
.transpose()
.context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")

View File

@@ -462,6 +462,8 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
if var(REAL_S3_ENV).is_ok() {
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
}
#[cfg(target_os = "linux")]
assert!(body.contains("process_threads"));
}

View File

@@ -235,7 +235,7 @@ pub enum PageServiceProtocolPipelinedBatchingStrategy {
ScatteredLsn,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
pub enum GetVectoredConcurrentIo {
/// The read path is fully sequential: layers are visited
@@ -491,6 +491,8 @@ pub struct TenantConfigToml {
/// Tenant level performance sampling ratio override. Controls the ratio of get page requests
/// that will get perf sampling for the tenant.
pub sampling_ratio: Option<Ratio>,
/// Capacity of relsize snapshot cache (used by replicas).
pub relsize_snapshot_cache_capacity: usize,
}
pub mod defaults {
@@ -730,6 +732,7 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true;
pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB
pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100;
pub const DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY: usize = 1000;
}
impl Default for TenantConfigToml {
@@ -787,6 +790,7 @@ impl Default for TenantConfigToml {
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
sampling_ratio: None,
relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY,
}
}
}

View File

@@ -630,6 +630,8 @@ pub struct TenantConfigPatch {
pub gc_compaction_ratio_percent: FieldPatch<u64>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub sampling_ratio: FieldPatch<Option<Ratio>>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub relsize_snapshot_cache_capacity: FieldPatch<usize>,
}
/// Like [`crate::config::TenantConfigToml`], but preserves the information
@@ -759,6 +761,9 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub sampling_ratio: Option<Option<Ratio>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub relsize_snapshot_cache_capacity: Option<usize>,
}
impl TenantConfig {
@@ -804,6 +809,7 @@ impl TenantConfig {
mut gc_compaction_initial_threshold_kb,
mut gc_compaction_ratio_percent,
mut sampling_ratio,
mut relsize_snapshot_cache_capacity,
} = self;
patch.checkpoint_distance.apply(&mut checkpoint_distance);
@@ -905,6 +911,9 @@ impl TenantConfig {
.gc_compaction_ratio_percent
.apply(&mut gc_compaction_ratio_percent);
patch.sampling_ratio.apply(&mut sampling_ratio);
patch
.relsize_snapshot_cache_capacity
.apply(&mut relsize_snapshot_cache_capacity);
Ok(Self {
checkpoint_distance,
@@ -944,6 +953,7 @@ impl TenantConfig {
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
sampling_ratio,
relsize_snapshot_cache_capacity,
})
}
@@ -1052,6 +1062,9 @@ impl TenantConfig {
.gc_compaction_ratio_percent
.unwrap_or(global_conf.gc_compaction_ratio_percent),
sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio),
relsize_snapshot_cache_capacity: self
.relsize_snapshot_cache_capacity
.unwrap_or(global_conf.relsize_snapshot_cache_capacity),
}
}
}

View File

@@ -9,12 +9,14 @@ use sha2::digest::FixedOutput;
use sha2::{Digest, Sha256};
use tokio::task::yield_now;
use crate::CSafeStr;
const NONCE_LENGTH: usize = 24;
/// The identifier of the SCRAM-SHA-256 SASL authentication mechanism.
pub const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
pub const SCRAM_SHA_256: &CSafeStr = CSafeStr::from_cstr(c"SCRAM-SHA-256");
/// The identifier of the SCRAM-SHA-256-PLUS SASL authentication mechanism.
pub const SCRAM_SHA_256_PLUS: &str = "SCRAM-SHA-256-PLUS";
pub const SCRAM_SHA_256_PLUS: &CSafeStr = CSafeStr::from_cstr(c"SCRAM-SHA-256-PLUS");
// since postgres passwords are not required to exclude saslprep-prohibited
// characters or even be valid UTF8, we run saslprep if possible and otherwise

View File

@@ -11,7 +11,7 @@
//! set to `UTF8`. It will most likely not behave properly if that is not the case.
#![warn(missing_docs, clippy::all)]
use std::io;
use std::{ffi::CStr, io};
use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, BytesMut};
@@ -36,20 +36,67 @@ pub enum IsNull {
No,
}
fn write_nullable<F, E>(serializer: F, buf: &mut BytesMut) -> Result<(), E>
/// A [`std::ffi::CStr`] but without the null byte.
#[repr(transparent)]
#[derive(PartialEq, Eq, Hash, Debug)]
pub struct CSafeStr([u8]);
impl CSafeStr {
/// Create a new `CSafeStr`, erroring if the bytes contains a null.
pub fn new(bytes: &[u8]) -> Result<&Self, io::Error> {
let nul_pos = memchr::memchr(0, bytes);
match nul_pos {
Some(nul_pos) => Err(io::Error::other(format!(
"unexpected null byte at position {nul_pos}"
))),
None => {
// Safety: CSafeStr is transparent over [u8].
Ok(unsafe { std::mem::transmute(bytes) })
}
}
}
/// Create a new `CSafeStr` up until the next null.
pub fn take<'a>(bytes: &mut &'a [u8]) -> &'a Self {
let nul_pos = memchr::memchr(0, bytes).unwrap_or(bytes.len());
let bytes = bytes
.split_off(..nul_pos)
.expect("nul_pos should be in-bounds");
// Safety: CSafeStr is transparent over [u8].
unsafe { std::mem::transmute(bytes) }
}
/// Get the bytes of this CSafeStr.
pub const fn as_bytes(&self) -> &[u8] {
&self.0
}
/// Create a new `CSafeStr`
pub const fn from_cstr(s: &CStr) -> &CSafeStr {
// Safety: CSafeStr is transparent over [u8].
unsafe { std::mem::transmute(s.to_bytes()) }
}
}
impl<'a> From<&'a CStr> for &'a CSafeStr {
fn from(s: &'a CStr) -> &'a CSafeStr {
CSafeStr::from_cstr(s)
}
}
fn write_nullable<F>(serializer: F, buf: &mut BytesMut)
where
F: FnOnce(&mut BytesMut) -> Result<IsNull, E>,
E: From<io::Error>,
F: FnOnce(&mut BytesMut) -> IsNull,
{
let base = buf.len();
buf.put_i32(0);
let size = match serializer(buf)? {
IsNull::No => i32::from_usize(buf.len() - base - 4)?,
let size = match serializer(buf) {
// this is an unreasonable enough case that I think a panic is acceptable.
IsNull::No => i32::from_usize(buf.len() - base - 4)
.expect("buffer size should not be larger than i32::MAX"),
IsNull::Yes => -1,
};
BigEndian::write_i32(&mut buf[base..], size);
Ok(())
}
trait FromUsize: Sized {

View File

@@ -9,7 +9,7 @@ use bytes::{Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use memchr::memchr;
use crate::Oid;
use crate::{CSafeStr, Oid};
// top-level message tags
const PARSE_COMPLETE_TAG: u8 = b'1';
@@ -332,25 +332,24 @@ impl AuthenticationSaslBody {
pub struct SaslMechanisms<'a>(&'a [u8]);
impl<'a> FallibleIterator for SaslMechanisms<'a> {
type Item = &'a str;
type Item = &'a CSafeStr;
type Error = io::Error;
#[inline]
fn next(&mut self) -> io::Result<Option<&'a str>> {
let value_end = find_null(self.0, 0)?;
if value_end == 0 {
if self.0.len() != 1 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid message length: expected to be at end of iterator for sasl",
));
}
Ok(None)
} else {
let value = get_str(&self.0[..value_end])?;
self.0 = &self.0[value_end + 1..];
Ok(Some(value))
fn next(&mut self) -> io::Result<Option<&'a CSafeStr>> {
if self.0 == b"0" {
return Ok(None);
}
let value = CSafeStr::take(&mut self.0);
if value.as_bytes().len() == 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid message length: expected to be at end of iterator for sasl",
));
}
Ok(Some(value))
}
}

View File

@@ -1,114 +1,73 @@
//! Frontend message serialization.
#![allow(missing_docs)]
use std::error::Error;
use std::{io, marker};
use std::io;
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, BytesMut};
use crate::{FromUsize, IsNull, Oid, write_nullable};
use crate::{CSafeStr, FromUsize, IsNull, Oid, write_nullable};
#[inline]
fn write_body<F, E>(buf: &mut BytesMut, f: F) -> Result<(), E>
fn write_body<F>(buf: &mut BytesMut, f: F)
where
F: FnOnce(&mut BytesMut) -> Result<(), E>,
E: From<io::Error>,
F: FnOnce(&mut BytesMut),
{
let base = buf.len();
buf.extend_from_slice(&[0; 4]);
f(buf)?;
f(buf);
let size = i32::from_usize(buf.len() - base)?;
let size =
i32::from_usize(buf.len() - base).expect("buffer size should not be larger than i32::MAX");
BigEndian::write_i32(&mut buf[base..], size);
Ok(())
}
pub enum BindError {
Conversion(Box<dyn Error + marker::Sync + Send>),
Serialization(io::Error),
}
impl From<Box<dyn Error + marker::Sync + Send>> for BindError {
#[inline]
fn from(e: Box<dyn Error + marker::Sync + Send>) -> BindError {
BindError::Conversion(e)
}
}
impl From<io::Error> for BindError {
#[inline]
fn from(e: io::Error) -> BindError {
BindError::Serialization(e)
}
}
#[inline]
pub fn bind<I, J, F, T, K>(
portal: &str,
statement: &str,
portal: &CSafeStr,
statement: &CSafeStr,
formats: I,
values: J,
mut serializer: F,
result_formats: K,
buf: &mut BytesMut,
) -> Result<(), BindError>
where
) where
I: IntoIterator<Item = i16>,
J: IntoIterator<Item = T>,
F: FnMut(T, &mut BytesMut) -> Result<IsNull, Box<dyn Error + marker::Sync + Send>>,
F: FnMut(T, &mut BytesMut) -> IsNull,
K: IntoIterator<Item = i16>,
{
buf.put_u8(b'B');
write_body(buf, |buf| {
write_cstr(portal.as_bytes(), buf)?;
write_cstr(statement.as_bytes(), buf)?;
write_counted(
formats,
|f, buf| {
buf.put_i16(f);
Ok::<_, io::Error>(())
},
buf,
)?;
write_cstr(portal, buf);
write_cstr(statement, buf);
write_counted(formats, |f, buf| buf.put_i16(f), buf);
write_counted(
values,
|v, buf| write_nullable(|buf| serializer(v, buf), buf),
buf,
)?;
write_counted(
result_formats,
|f, buf| {
buf.put_i16(f);
Ok::<_, io::Error>(())
},
buf,
)?;
Ok(())
);
write_counted(result_formats, |f, buf| buf.put_i16(f), buf);
})
}
#[inline]
fn write_counted<I, T, F, E>(items: I, mut serializer: F, buf: &mut BytesMut) -> Result<(), E>
fn write_counted<I, T, F>(items: I, mut serializer: F, buf: &mut BytesMut)
where
I: IntoIterator<Item = T>,
F: FnMut(T, &mut BytesMut) -> Result<(), E>,
E: From<io::Error>,
F: FnMut(T, &mut BytesMut),
{
let base = buf.len();
buf.extend_from_slice(&[0; 2]);
let mut count = 0;
for item in items {
serializer(item, buf)?;
serializer(item, buf);
count += 1;
}
let count = i16::from_usize(count)?;
let count = i16::from_usize(count).expect("list should not exceed 32767 items");
BigEndian::write_i16(&mut buf[base..], count);
Ok(())
}
#[inline]
@@ -117,17 +76,15 @@ pub fn cancel_request(process_id: i32, secret_key: i32, buf: &mut BytesMut) {
buf.put_i32(80_877_102);
buf.put_i32(process_id);
buf.put_i32(secret_key);
Ok::<_, io::Error>(())
})
.unwrap();
}
#[inline]
pub fn close(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
pub fn close(variant: u8, name: &CSafeStr, buf: &mut BytesMut) {
buf.put_u8(b'C');
write_body(buf, |buf| {
buf.put_u8(variant);
write_cstr(name.as_bytes(), buf)
write_cstr(name, buf)
})
}
@@ -162,85 +119,75 @@ where
#[inline]
pub fn copy_done(buf: &mut BytesMut) {
buf.put_u8(b'c');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
write_body(buf, |_| {});
}
#[inline]
pub fn copy_fail(message: &str, buf: &mut BytesMut) -> io::Result<()> {
pub fn copy_fail(message: &CSafeStr, buf: &mut BytesMut) {
buf.put_u8(b'f');
write_body(buf, |buf| write_cstr(message.as_bytes(), buf))
write_body(buf, |buf| write_cstr(message, buf))
}
#[inline]
pub fn describe(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
pub fn describe(variant: u8, name: &CSafeStr, buf: &mut BytesMut) {
buf.put_u8(b'D');
write_body(buf, |buf| {
buf.put_u8(variant);
write_cstr(name.as_bytes(), buf)
write_cstr(name, buf)
})
}
#[inline]
pub fn execute(portal: &str, max_rows: i32, buf: &mut BytesMut) -> io::Result<()> {
pub fn execute(portal: &CSafeStr, max_rows: i32, buf: &mut BytesMut) {
buf.put_u8(b'E');
write_body(buf, |buf| {
write_cstr(portal.as_bytes(), buf)?;
write_cstr(portal, buf);
buf.put_i32(max_rows);
Ok(())
})
}
#[inline]
pub fn parse<I>(name: &str, query: &str, param_types: I, buf: &mut BytesMut) -> io::Result<()>
pub fn parse<I>(name: &CSafeStr, query: &CSafeStr, param_types: I, buf: &mut BytesMut)
where
I: IntoIterator<Item = Oid>,
{
buf.put_u8(b'P');
write_body(buf, |buf| {
write_cstr(name.as_bytes(), buf)?;
write_cstr(query.as_bytes(), buf)?;
write_counted(
param_types,
|t, buf| {
buf.put_u32(t);
Ok::<_, io::Error>(())
},
buf,
)?;
Ok(())
write_cstr(name, buf);
write_cstr(query, buf);
write_counted(param_types, |t, buf| buf.put_u32(t), buf);
})
}
#[inline]
pub fn password_message(password: &[u8], buf: &mut BytesMut) -> io::Result<()> {
pub fn password_message(password: &CSafeStr, buf: &mut BytesMut) {
buf.put_u8(b'p');
write_body(buf, |buf| write_cstr(password, buf))
}
#[inline]
pub fn query(query: &str, buf: &mut BytesMut) -> io::Result<()> {
pub fn query(query: &CSafeStr, buf: &mut BytesMut) {
buf.put_u8(b'Q');
write_body(buf, |buf| write_cstr(query.as_bytes(), buf))
write_body(buf, |buf| write_cstr(query, buf))
}
#[inline]
pub fn sasl_initial_response(mechanism: &str, data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
pub fn sasl_initial_response(mechanism: &CSafeStr, data: &[u8], buf: &mut BytesMut) {
buf.put_u8(b'p');
write_body(buf, |buf| {
write_cstr(mechanism.as_bytes(), buf)?;
let len = i32::from_usize(data.len())?;
write_cstr(mechanism, buf);
let len =
i32::from_usize(data.len()).expect("sasl data should not be larger than i32::MAX");
buf.put_i32(len);
buf.put_slice(data);
Ok(())
})
}
#[inline]
pub fn sasl_response(data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
pub fn sasl_response(data: &[u8], buf: &mut BytesMut) {
buf.put_u8(b'p');
write_body(buf, |buf| {
buf.put_slice(data);
Ok(())
})
}
@@ -248,19 +195,16 @@ pub fn sasl_response(data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
pub fn ssl_request(buf: &mut BytesMut) {
write_body(buf, |buf| {
buf.put_i32(80_877_103);
Ok::<_, io::Error>(())
})
.unwrap();
});
}
#[inline]
pub fn startup_message(parameters: &StartupMessageParams, buf: &mut BytesMut) -> io::Result<()> {
pub fn startup_message(parameters: &StartupMessageParams, buf: &mut BytesMut) {
write_body(buf, |buf| {
// postgres protocol version 3.0(196608) in bigger-endian
buf.put_i32(0x00_03_00_00);
buf.put_slice(&parameters.params);
buf.put_u8(0);
Ok(())
})
}
@@ -271,10 +215,7 @@ pub struct StartupMessageParams {
impl StartupMessageParams {
/// Set parameter's value by its name.
pub fn insert(&mut self, name: &str, value: &str) {
if name.contains('\0') || value.contains('\0') {
panic!("startup parameter name or value contained a null")
}
pub fn insert(&mut self, name: &CSafeStr, value: &CSafeStr) {
self.params.put_slice(name.as_bytes());
self.params.put_u8(0);
self.params.put_slice(value.as_bytes());
@@ -285,24 +226,17 @@ impl StartupMessageParams {
#[inline]
pub fn sync(buf: &mut BytesMut) {
buf.put_u8(b'S');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
write_body(buf, |_| {});
}
#[inline]
pub fn terminate(buf: &mut BytesMut) {
buf.put_u8(b'X');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
write_body(buf, |_| {});
}
#[inline]
fn write_cstr(s: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
if s.contains(&0) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"string contains embedded null",
));
}
buf.put_slice(s);
fn write_cstr(s: &CSafeStr, buf: &mut BytesMut) {
buf.put_slice(s.as_bytes());
buf.put_u8(0);
Ok(())
}

View File

@@ -13,7 +13,7 @@ use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
#[doc(inline)]
pub use postgres_protocol2::Oid;
use postgres_protocol2::types;
use postgres_protocol2::{IsNull, types};
use crate::type_gen::{Inner, Other};
@@ -32,36 +32,15 @@ macro_rules! accepts {
/// All `ToSql` implementations should use this macro.
macro_rules! to_sql_checked {
() => {
fn to_sql_checked(
&self,
ty: &$crate::Type,
out: &mut $crate::private::BytesMut,
) -> ::std::result::Result<
$crate::IsNull,
Box<dyn ::std::error::Error + ::std::marker::Sync + ::std::marker::Send>,
> {
$crate::__to_sql_checked(self, ty, out)
fn check(&self, ty: &Type) -> ::std::result::Result<(), $crate::WrongType> {
if !<Self as $crate::ToSql>::accepts(ty) {
return Err($crate::WrongType::new::<Self>(ty.clone()));
}
Ok(())
}
};
}
// WARNING: this function is not considered part of this crate's public API.
// It is subject to change at any time.
#[doc(hidden)]
pub fn __to_sql_checked<T>(
v: &T,
ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn Error + Sync + Send>>
where
T: ToSql,
{
if !T::accepts(ty) {
return Err(Box::new(WrongType::new::<T>(ty.clone())));
}
v.to_sql(ty, out)
}
// mod pg_lsn;
#[doc(hidden)]
pub mod private;
@@ -369,14 +348,6 @@ macro_rules! simple_from {
simple_from!(i8, char_from_sql, CHAR);
simple_from!(u32, oid_from_sql, OID);
/// An enum representing the nullability of a Postgres value.
pub enum IsNull {
/// The value is NULL.
Yes,
/// The value is not NULL.
No,
}
/// A trait for types that can be converted into Postgres values.
pub trait ToSql: fmt::Debug {
/// Converts the value of `self` into the binary format of the specified
@@ -388,9 +359,7 @@ pub trait ToSql: fmt::Debug {
/// The return value indicates if this value should be represented as
/// `NULL`. If this is the case, implementations **must not** write
/// anything to `out`.
fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>>
where
Self: Sized;
fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> IsNull;
/// Determines if a value of this type can be converted to the specified
/// Postgres `Type`.
@@ -402,11 +371,7 @@ pub trait ToSql: fmt::Debug {
///
/// *All* implementations of this method should be generated by the
/// `to_sql_checked!()` macro.
fn to_sql_checked(
&self,
ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn Error + Sync + Send>>;
fn check(&self, ty: &Type) -> Result<(), WrongType>;
/// Specify the encode format
fn encode_format(&self, _ty: &Type) -> Format {
@@ -426,14 +391,14 @@ pub enum Format {
}
impl ToSql for &str {
fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> IsNull {
match *ty {
ref ty if ty.name() == "ltree" => types::ltree_to_sql(self, w),
ref ty if ty.name() == "lquery" => types::lquery_to_sql(self, w),
ref ty if ty.name() == "ltxtquery" => types::ltxtquery_to_sql(self, w),
_ => types::text_to_sql(self, w),
}
Ok(IsNull::No)
IsNull::No
}
fn accepts(ty: &Type) -> bool {
@@ -457,12 +422,9 @@ impl ToSql for &str {
macro_rules! simple_to {
($t:ty, $f:ident, $($expected:ident),+) => {
impl ToSql for $t {
fn to_sql(&self,
_: &Type,
w: &mut BytesMut)
-> Result<IsNull, Box<dyn Error + Sync + Send>> {
fn to_sql(&self, _: &Type, w: &mut BytesMut) -> IsNull {
types::$f(*self, w);
Ok(IsNull::No)
IsNull::No
}
accepts!($($expected),+);

View File

@@ -219,7 +219,7 @@ impl Client {
}
let buf = self.client.inner().with_buf(|buf| {
frontend::query("ROLLBACK", buf).unwrap();
frontend::query(c"ROLLBACK".into(), buf);
buf.split().freeze()
});
let _ = self

View File

@@ -4,6 +4,7 @@ use std::net::IpAddr;
use std::time::Duration;
use std::{fmt, str};
use postgres_protocol2::CSafeStr;
pub use postgres_protocol2::authentication::sasl::ScramKeys;
use postgres_protocol2::message::frontend::StartupMessageParams;
use serde::{Deserialize, Serialize};
@@ -162,7 +163,10 @@ impl Config {
self.username = true;
}
self.server_params.insert(name, value);
self.server_params.insert(
CSafeStr::new(name.as_bytes()).expect("param name should not contain a null"),
CSafeStr::new(value.as_bytes()).expect("param name should not contain a null"),
);
self
}

View File

@@ -6,6 +6,7 @@ use std::task::{Context, Poll};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{Sink, SinkExt, Stream, TryStreamExt, ready};
use postgres_protocol2::CSafeStr;
use postgres_protocol2::authentication::sasl;
use postgres_protocol2::authentication::sasl::ScramSha256;
use postgres_protocol2::message::backend::{AuthenticationSaslBody, Message, NoticeResponseBody};
@@ -122,7 +123,7 @@ where
T: AsyncRead + AsyncWrite + Unpin,
{
let mut buf = BytesMut::new();
frontend::startup_message(&config.server_params, &mut buf).map_err(Error::encode)?;
frontend::startup_message(&config.server_params, &mut buf);
stream
.send(FrontendMessage::Raw(buf.freeze()))
@@ -193,7 +194,7 @@ where
T: AsyncRead + AsyncWrite + Unpin,
{
let mut buf = BytesMut::new();
frontend::password_message(password, &mut buf).map_err(Error::encode)?;
frontend::password_message(CSafeStr::new(password).map_err(Error::encode)?, &mut buf);
stream
.send(FrontendMessage::Raw(buf.freeze()))
@@ -214,10 +215,10 @@ where
let mut has_scram_plus = false;
let mut mechanisms = body.mechanisms();
while let Some(mechanism) = mechanisms.next().map_err(Error::parse)? {
match mechanism {
sasl::SCRAM_SHA_256 => has_scram = true,
sasl::SCRAM_SHA_256_PLUS => has_scram_plus = true,
_ => {}
if mechanism == sasl::SCRAM_SHA_256 {
has_scram = true;
} else if mechanism == sasl::SCRAM_SHA_256_PLUS {
has_scram_plus = true;
}
}
@@ -256,7 +257,7 @@ where
};
let mut buf = BytesMut::new();
frontend::sasl_initial_response(mechanism, scram.message(), &mut buf).map_err(Error::encode)?;
frontend::sasl_initial_response(mechanism, scram.message(), &mut buf);
stream
.send(FrontendMessage::Raw(buf.freeze()))
.await
@@ -275,7 +276,7 @@ where
.map_err(|e| Error::authentication(e.into()))?;
let mut buf = BytesMut::new();
frontend::sasl_response(scram.message(), &mut buf).map_err(Error::encode)?;
frontend::sasl_response(scram.message(), &mut buf);
stream
.send(FrontendMessage::Raw(buf.freeze()))
.await

View File

@@ -123,6 +123,6 @@ pub enum SimpleQueryMessage {
fn slice_iter<'a>(
s: &'a [&'a (dyn ToSql + Sync)],
) -> impl ExactSizeIterator<Item = &'a (dyn ToSql + Sync)> + 'a {
) -> impl ExactSizeIterator<Item = &'a (dyn ToSql + Sync)> + Clone + 'a {
s.iter().map(|s| *s as _)
}

View File

@@ -1,3 +1,4 @@
use std::ffi::CStr;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
@@ -5,9 +6,9 @@ use std::sync::Arc;
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{TryStreamExt, pin_mut};
use postgres_protocol2::CSafeStr;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tracing::debug;
use crate::client::{CachedTypeInfo, InnerClient};
use crate::codec::FrontendMessage;
@@ -15,7 +16,7 @@ use crate::connection::RequestMessages;
use crate::types::{Kind, Oid, Type};
use crate::{Column, Error, Statement, query, slice_iter};
pub(crate) const TYPEINFO_QUERY: &str = "\
pub(crate) const TYPEINFO_QUERY: &CStr = c"\
SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, t.typbasetype, n.nspname, t.typrelid
FROM pg_catalog.pg_type t
LEFT OUTER JOIN pg_catalog.pg_range r ON r.rngtypid = t.oid
@@ -25,11 +26,11 @@ WHERE t.oid = $1
async fn prepare_typecheck(
client: &Arc<InnerClient>,
name: &'static str,
query: &str,
name: &'static CStr,
query: &CSafeStr,
types: &[Type],
) -> Result<Statement, Error> {
let buf = encode(client, name, query, types)?;
let buf = encode(client, name.into(), query, types)?;
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
match responses.next().await? {
@@ -68,16 +69,21 @@ async fn prepare_typecheck(
Ok(Statement::new(client, name, parameters, columns))
}
fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result<Bytes, Error> {
if types.is_empty() {
debug!("preparing query {}: {}", name, query);
} else {
debug!("preparing query {} with types {:?}: {}", name, types, query);
}
fn encode(
client: &InnerClient,
name: &CSafeStr,
query: &CSafeStr,
types: &[Type],
) -> Result<Bytes, Error> {
// if types.is_empty() {
// debug!("preparing query {}: {}", name, query);
// } else {
// debug!("preparing query {} with types {:?}: {}", name, types, query);
// }
client.with_buf(|buf| {
frontend::parse(name, query, types.iter().map(Type::oid), buf).map_err(Error::encode)?;
frontend::describe(b'S', name, buf).map_err(Error::encode)?;
frontend::parse(name, query, types.iter().map(Type::oid), buf);
frontend::describe(b'S', name, buf);
frontend::sync(buf);
Ok(buf.split().freeze())
})
@@ -154,8 +160,8 @@ async fn typeinfo_statement(
return Ok(stmt.clone());
}
let typeinfo = "neon_proxy_typeinfo";
let stmt = prepare_typecheck(client, typeinfo, TYPEINFO_QUERY, &[]).await?;
let typeinfo = c"neon_proxy_typeinfo";
let stmt = prepare_typecheck(client, typeinfo, TYPEINFO_QUERY.into(), &[]).await?;
typecache.typeinfo = Some(stmt.clone());
Ok(stmt)

View File

@@ -1,4 +1,4 @@
use std::fmt;
use std::iter;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
@@ -8,25 +8,16 @@ use bytes::{BufMut, Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use pin_project_lite::pin_project;
use postgres_protocol2::CSafeStr;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use postgres_types2::{Format, ToSql, Type};
use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::IsNull;
use crate::{Column, Error, ReadyForQueryStatus, Row, Statement};
struct BorrowToSqlParamsDebug<'a>(&'a [&'a (dyn ToSql + Sync)]);
impl fmt::Debug for BorrowToSqlParamsDebug<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.0.iter()).finish()
}
}
pub async fn query<'a, I>(
client: &InnerClient,
statement: Statement,
@@ -34,19 +25,9 @@ pub async fn query<'a, I>(
) -> Result<RowStream, Error>
where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
I::IntoIter: ExactSizeIterator + Clone,
{
let buf = if tracing::enabled!(tracing::Level::DEBUG) {
let params = params.into_iter().collect::<Vec<_>>();
debug!(
"executing statement {} with parameters: {:?}",
statement.name(),
BorrowToSqlParamsDebug(params.as_slice()),
);
encode(client, &statement, params)?
} else {
encode(client, &statement, params)?
};
let buf = encode(client, &statement, params)?;
let responses = start(client, buf).await?;
Ok(RowStream {
statement,
@@ -68,45 +49,45 @@ where
I: IntoIterator<Item = Option<S>>,
I::IntoIter: ExactSizeIterator,
{
let query = CSafeStr::new(query.as_bytes()).map_err(Error::encode)?;
let params = params.into_iter();
let portal = c"".into(); // unnamed portal
let statement = c"".into(); // unnamed prepared statement
let buf = client.with_buf(|buf| {
frontend::parse(
"", // unnamed prepared statement
query, // query to parse
std::iter::empty(), // give no type info
statement,
query, // query to parse
iter::empty(), // give no type info
buf,
)
.map_err(Error::encode)?;
frontend::describe(b'S', "", buf).map_err(Error::encode)?;
// Bind, pass params as text, retrieve as binary
match frontend::bind(
"", // empty string selects the unnamed portal
"", // unnamed prepared statement
std::iter::empty(), // all parameters use the default format (text)
);
frontend::describe(b'S', statement, buf);
// Bind, pass params as text, retrieve as test
frontend::bind(
portal,
statement,
iter::empty(), // all parameters use the default format (text)
params,
|param, buf| match param {
Some(param) => {
buf.put_slice(param.as_ref().as_bytes());
Ok(postgres_protocol2::IsNull::No)
postgres_protocol2::IsNull::No
}
None => Ok(postgres_protocol2::IsNull::Yes),
None => postgres_protocol2::IsNull::Yes,
},
Some(0), // all text
buf,
) {
Ok(()) => Ok(()),
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
}?;
);
// Execute
frontend::execute("", 0, buf).map_err(Error::encode)?;
frontend::execute(portal, 0, buf);
// Sync
frontend::sync(buf);
Ok(buf.split().freeze())
})?;
buf.split().freeze()
});
// now read the responses
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
@@ -173,11 +154,13 @@ async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
pub fn encode<'a, I>(client: &InnerClient, statement: &Statement, params: I) -> Result<Bytes, Error>
where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
I::IntoIter: ExactSizeIterator + Clone,
{
let portal = c"".into(); // unnamed portal
client.with_buf(|buf| {
encode_bind(statement, params, "", buf)?;
frontend::execute("", 0, buf).map_err(Error::encode)?;
encode_bind(statement, params, portal, buf)?;
frontend::execute(portal, 0, buf);
frontend::sync(buf);
Ok(buf.split().freeze())
})
@@ -186,15 +169,15 @@ where
pub fn encode_bind<'a, I>(
statement: &Statement,
params: I,
portal: &str,
portal: &CSafeStr,
buf: &mut BytesMut,
) -> Result<(), Error>
where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
I::IntoIter: ExactSizeIterator + Clone,
{
let param_types = statement.params();
let params = params.into_iter();
let params = iter::zip(params.into_iter(), param_types);
assert!(
param_types.len() == params.len(),
@@ -203,35 +186,24 @@ where
params.len()
);
let (param_formats, params): (Vec<_>, Vec<_>) = params
.zip(param_types.iter())
.map(|(p, ty)| (p.encode_format(ty) as i16, p))
.unzip();
// check encodings
for (i, (p, ty)) in params.clone().enumerate() {
p.check(ty).map_err(|e| Error::to_sql(Box::new(e), i))?
}
let params = params.into_iter();
let param_formats = params.clone().map(|(p, ty)| p.encode_format(ty) as i16);
let mut error_idx = 0;
let r = frontend::bind(
frontend::bind(
portal,
statement.name(),
param_formats,
params.zip(param_types).enumerate(),
|(idx, (param, ty)), buf| match param.to_sql_checked(ty, buf) {
Ok(IsNull::No) => Ok(postgres_protocol2::IsNull::No),
Ok(IsNull::Yes) => Ok(postgres_protocol2::IsNull::Yes),
Err(e) => {
error_idx = idx;
Err(e)
}
},
params,
|(param, ty), buf| param.to_sql(ty, buf),
Some(1),
buf,
);
match r {
Ok(()) => Ok(()),
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, error_idx)),
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
}
Ok(())
}
pin_project! {

View File

@@ -7,6 +7,7 @@ use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use pin_project_lite::pin_project;
use postgres_protocol2::CSafeStr;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tracing::debug;
@@ -69,8 +70,9 @@ pub async fn batch_execute(
}
pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
let query = CSafeStr::new(query.as_bytes()).map_err(Error::encode)?;
client.with_buf(|buf| {
frontend::query(query, buf).map_err(Error::encode)?;
frontend::query(query, buf);
Ok(buf.split().freeze())
})
}

View File

@@ -1,9 +1,10 @@
use std::ffi::CStr;
use std::fmt;
use std::sync::{Arc, Weak};
use postgres_protocol2::Oid;
use postgres_protocol2::message::backend::Field;
use postgres_protocol2::message::frontend;
use postgres_protocol2::{CSafeStr, Oid};
use crate::client::InnerClient;
use crate::codec::FrontendMessage;
@@ -12,7 +13,7 @@ use crate::types::Type;
struct StatementInner {
client: Weak<InnerClient>,
name: &'static str,
name: &'static CStr,
params: Vec<Type>,
columns: Vec<Column>,
}
@@ -21,7 +22,7 @@ impl Drop for StatementInner {
fn drop(&mut self) {
if let Some(client) = self.client.upgrade() {
let buf = client.with_buf(|buf| {
frontend::close(b'S', self.name, buf).unwrap();
frontend::close(b'S', self.name.into(), buf);
frontend::sync(buf);
buf.split().freeze()
});
@@ -39,7 +40,7 @@ pub struct Statement(Arc<StatementInner>);
impl Statement {
pub(crate) fn new(
inner: &Arc<InnerClient>,
name: &'static str,
name: &'static CStr,
params: Vec<Type>,
columns: Vec<Column>,
) -> Statement {
@@ -54,14 +55,14 @@ impl Statement {
pub(crate) fn new_anonymous(params: Vec<Type>, columns: Vec<Column>) -> Statement {
Statement(Arc::new(StatementInner {
client: Weak::new(),
name: "<anonymous>",
name: c"<anonymous>",
params,
columns,
}))
}
pub(crate) fn name(&self) -> &str {
self.0.name
pub(crate) fn name(&self) -> &CSafeStr {
self.0.name.into()
}
/// Returns the expected types of the statement's parameters.

View File

@@ -21,7 +21,7 @@ impl Drop for Transaction<'_> {
}
let buf = self.client.inner().with_buf(|buf| {
frontend::query("ROLLBACK", buf).unwrap();
frontend::query(c"ROLLBACK".into(), buf);
buf.split().freeze()
});
let _ = self

View File

@@ -30,6 +30,7 @@ crc32c.workspace = true
either.workspace = true
fail.workspace = true
futures.workspace = true
hashlink.workspace = true
hex.workspace = true
humantime.workspace = true
humantime-serde.workspace = true

View File

@@ -0,0 +1,13 @@
[package]
name = "pageserver_page_api"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
prost.workspace = true
tonic.workspace = true
workspace_hack.workspace = true
[build-dependencies]
tonic-build.workspace = true

View File

@@ -0,0 +1,13 @@
use std::env;
use std::path::PathBuf;
/// Generates Rust code from .proto Protobuf schemas, along with a binary file
/// descriptor set for Protobuf schema reflection.
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from(env::var("OUT_DIR")?);
tonic_build::configure()
.bytes(["."])
.file_descriptor_set_path(out_dir.join("page_api_descriptor.bin"))
.compile_protos(&["proto/page_service.proto"], &["proto"])
.map_err(|err| err.into())
}

View File

@@ -0,0 +1,233 @@
// Page service, presented by pageservers for computes.
//
// This is the compute read path. It primarily serves page versions at given
// LSNs, but also base backups, SLRU segments, and relation metadata.
//
// EXPERIMENTAL: this is still under development and subject to change.
//
// Request metadata headers:
// - authorization: JWT token ("Bearer <token>"), if auth is enabled
// - neon-tenant-id: tenant ID ("7c4a1f9e3bd6470c8f3e21a65bd2e980")
// - neon-shard-id: shard ID, as <number><count> in hex ("0b10" = shard 11 of 16, 0-based)
// - neon-timeline-id: timeline ID ("f08c4e9a2d5f76b1e3a7c2d8910f4b3e")
//
// The service can be accessed via e.g. grpcurl:
//
// ```
// grpcurl \
// -plaintext \
// -H "neon-tenant-id: 7c4a1f9e3bd6470c8f3e21a65bd2e980" \
// -H "neon-shard-id: 0b10" \
// -H "neon-timeline-id: f08c4e9a2d5f76b1e3a7c2d8910f4b3e" \
// -H "authorization: Bearer $JWT" \
// -d '{"read_lsn": {"request_lsn": 1234567890}, "rel": {"spc_oid": 1663, "db_oid": 1234, "rel_number": 5678, "fork_number": 0}}'
// localhost:51051 page_api.PageService/CheckRelExists
// ```
//
// TODO: consider adding neon-compute-mode ("primary", "static", "replica").
// However, this will require reconnecting when changing modes.
//
// TODO: write implementation guidance on
// - Health checks
// - Tracing, OpenTelemetry
// - Compression
syntax = "proto3";
package page_api;
service PageService {
// Returns whether a relation exists.
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
// Fetches a base backup.
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
// Returns the total size of a database, as # of bytes.
rpc GetDbSize (GetDbSizeRequest) returns (GetDbSizeResponse);
// Fetches pages.
//
// This is implemented as a bidirectional streaming RPC for performance. Unary
// requests incur costs for e.g. HTTP/2 stream setup, header parsing,
// authentication, and so on -- with streaming, we only pay these costs during
// the initial stream setup. This ~doubles throughput in benchmarks. Other
// RPCs use regular unary requests, since they are not as frequent and
// performance-critical, and this simplifies implementation.
//
// NB: a status response (e.g. errors) will terminate the stream. The stream
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
// Most errors are therefore sent as GetPageResponse.status instead.
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
// Returns the size of a relation, as # of blocks.
rpc GetRelSize (GetRelSizeRequest) returns (GetRelSizeResponse);
// Fetches an SLRU segment.
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
}
// The LSN a request should read at.
message ReadLsn {
// The request's read LSN. Required.
uint64 request_lsn = 1;
// If given, the caller guarantees that the page has not been modified since
// this LSN. Must be smaller than or equal to request_lsn. This allows the
// Pageserver to serve an old page without waiting for the request LSN to
// arrive. Valid for all request types.
//
// It is undefined behaviour to make a request such that the page was, in
// fact, modified between request_lsn and not_modified_since_lsn. The
// Pageserver might detect it and return an error, or it might return the old
// page version or the new page version. Setting not_modified_since_lsn equal
// to request_lsn is always safe, but can lead to unnecessary waiting.
uint64 not_modified_since_lsn = 2;
}
// A relation identifier.
message RelTag {
uint32 spc_oid = 1;
uint32 db_oid = 2;
uint32 rel_number = 3;
uint32 fork_number = 4;
}
// Checks whether a relation exists, at the given LSN. Only valid on shard 0,
// other shards will error.
message CheckRelExistsRequest {
ReadLsn read_lsn = 1;
RelTag rel = 2;
}
message CheckRelExistsResponse {
bool exists = 1;
}
// Requests a base backup at a given LSN.
message GetBaseBackupRequest {
// The LSN to fetch a base backup at.
ReadLsn read_lsn = 1;
// If true, logical replication slots will not be created.
bool replica = 2;
}
// Base backup response chunk, returned as an ordered stream.
message GetBaseBackupResponseChunk {
// A basebackup data chunk. The size is undefined, but bounded by the 4 MB
// gRPC message size limit.
bytes chunk = 1;
}
// Requests the size of a database, as # of bytes. Only valid on shard 0, other
// shards will error.
message GetDbSizeRequest {
ReadLsn read_lsn = 1;
uint32 db_oid = 2;
}
message GetDbSizeResponse {
uint64 num_bytes = 1;
}
// Requests one or more pages.
message GetPageRequest {
// A request ID. Will be included in the response. Should be unique for
// in-flight requests on the stream.
uint64 request_id = 1;
// The request class.
GetPageClass request_class = 2;
// The LSN to read at.
ReadLsn read_lsn = 3;
// The relation to read from.
RelTag rel = 4;
// Page numbers to read. Must belong to the remote shard.
//
// Multiple pages will be executed as a single batch by the Pageserver,
// amortizing layer access costs and parallelizing them. This may increase the
// latency of any individual request, but improves the overall latency and
// throughput of the batch as a whole.
//
// TODO: this causes an allocation in the common single-block case. The sender
// can use a SmallVec to stack-allocate it, but Prost will always deserialize
// into a heap-allocated Vec. Consider optimizing this.
//
// TODO: we might be able to avoid a sort or something if we mandate that these
// are always in order. But we can't currenly rely on this on the server, because
// of compatibility with the libpq protocol handler.
repeated uint32 block_number = 5;
}
// A GetPageRequest class. Primarily intended for observability, but may also be
// used for prioritization in the future.
enum GetPageClass {
// Unknown class. For forwards compatibility: used when the client sends a
// class that the server doesn't know about.
GET_PAGE_CLASS_UNKNOWN = 0;
// A normal request. This is the default.
GET_PAGE_CLASS_NORMAL = 1;
// A prefetch request. NB: can only be classified on pg < 18.
GET_PAGE_CLASS_PREFETCH = 2;
// A background request (e.g. vacuum).
GET_PAGE_CLASS_BACKGROUND = 3;
}
// A GetPage response.
//
// A batch response will contain all of the requested pages. We could eagerly
// emit individual pages as soon as they are ready, but on a readv() Postgres
// holds buffer pool locks on all pages in the batch and we'll only return once
// the entire batch is ready, so no one can make use of the individual pages.
message GetPageResponse {
// The original request's ID.
uint64 request_id = 1;
// The response status code.
GetPageStatus status = 2;
// A string describing the status, if any.
string reason = 3;
// The 8KB page images, in the same order as the request. Empty if status != OK.
repeated bytes page_image = 4;
}
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
// want to send errors as gRPC statuses, since this would terminate the stream.
enum GetPageStatus {
// Unknown status. For forwards compatibility: used when the server sends a
// status code that the client doesn't know about.
GET_PAGE_STATUS_UNKNOWN = 0;
// The request was successful.
GET_PAGE_STATUS_OK = 1;
// The page did not exist. The tenant/timeline/shard has already been
// validated during stream setup.
GET_PAGE_STATUS_NOT_FOUND = 2;
// The request was invalid.
GET_PAGE_STATUS_INVALID = 3;
// The tenant is rate limited. Slow down and retry later.
GET_PAGE_STATUS_SLOW_DOWN = 4;
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
// layer download. This could free up the server task to process other
// requests while the layer download is in progress.
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on
// shard 0, other shards will error.
message GetRelSizeRequest {
ReadLsn read_lsn = 1;
RelTag rel = 2;
}
message GetRelSizeResponse {
uint32 num_blocks = 1;
}
// Requests an SLRU segment. Only valid on shard 0, other shards will error.
message GetSlruSegmentRequest {
ReadLsn read_lsn = 1;
uint32 kind = 2;
uint32 segno = 3;
}
// Returns an SLRU segment.
//
// These are up 32 pages (256 KB), so we can send them as a single response.
message GetSlruSegmentResponse {
bytes segment = 1;
}

View File

@@ -0,0 +1,19 @@
//! This crate provides the Pageserver's page API. It contains:
//!
//! * proto/page_service.proto: the Protobuf schema for the page API.
//! * proto: auto-generated Protobuf types for gRPC.
//!
//! This crate is used by both the client and the server. Try to keep it slim.
// Code generated by protobuf.
pub mod proto {
tonic::include_proto!("page_api");
/// File descriptor set for Protobuf schema reflection. This allows using
/// e.g. grpcurl with the API.
pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("page_api_descriptor");
pub use page_service_client::PageServiceClient;
pub use page_service_server::{PageService, PageServiceServer};
}

View File

@@ -144,7 +144,7 @@ where
replica,
ctx,
io_concurrency: IoConcurrency::spawn_from_conf(
timeline.conf,
timeline.conf.get_vectored_concurrent_io,
timeline
.gate
.enter()
@@ -343,7 +343,7 @@ where
// Gather non-relational files from object storage pages.
let slru_partitions = self
.timeline
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
.get_slru_keyspace(Version::at(self.lsn), self.ctx)
.await?
.partition(
self.timeline.get_shard_identity(),
@@ -378,7 +378,7 @@ where
// Otherwise only include init forks of unlogged relations.
let rels = self
.timeline
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
.await?;
for &rel in rels.iter() {
// Send init fork as main fork to provide well formed empty
@@ -517,7 +517,7 @@ where
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> {
let nblocks = self
.timeline
.get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
.get_rel_size(src, Version::at(self.lsn), self.ctx)
.await?;
// If the relation is empty, create an empty file
@@ -577,7 +577,7 @@ where
let relmap_img = if has_relmap_file {
let img = self
.timeline
.get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.get_relmap_file(spcnode, dbnode, Version::at(self.lsn), self.ctx)
.await?;
if img.len()
@@ -631,7 +631,7 @@ where
if !has_relmap_file
&& self
.timeline
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
.await?
.is_empty()
{

View File

@@ -3199,7 +3199,7 @@ async fn list_aux_files(
.await?;
let io_concurrency = IoConcurrency::spawn_from_conf(
state.conf,
state.conf.get_vectored_concurrent_io,
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
);

View File

@@ -843,23 +843,50 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy<IntCounter> = Lazy::new(|
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
pub(crate) static RELSIZE_LATEST_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_relsize_cache_entries",
"Number of entries in the relation size cache",
"pageserver_relsize_latest_cache_entries",
"Number of entries in the latest relation size cache",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",)
.expect("failed to define a metric")
pub(crate) static RELSIZE_LATEST_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_latest_cache_hits",
"Latest relation size cache hits",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
pub(crate) static RELSIZE_LATEST_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_cache_misses",
"Relation size cache misses",
"pageserver_relsize_latest_cache_misses",
"Relation size latest cache misses",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_SNAPSHOT_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_relsize_snapshot_cache_entries",
"Number of entries in the pitr relation size cache",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_SNAPSHOT_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_snapshot_cache_hits",
"Pitr relation size cache hits",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_SNAPSHOT_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_snapshot_cache_misses",
"Relation size snapshot cache misses",
)
.expect("failed to define a metric")
});

View File

@@ -18,7 +18,7 @@ use itertools::Itertools;
use jsonwebtoken::TokenData;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::key::rel_block_to_key;
@@ -62,7 +62,7 @@ use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
SmgrOpTimer, TimelineMetrics,
};
use crate::pgdatadir_mapping::Version;
use crate::pgdatadir_mapping::{LsnRange, Version};
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
@@ -331,10 +331,10 @@ async fn page_service_conn_main(
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(
conf,
tenant_manager,
auth,
pipelining_config,
conf.get_vectored_concurrent_io,
perf_span_fields,
connection_ctx,
cancel.clone(),
@@ -371,7 +371,6 @@ async fn page_service_conn_main(
}
struct PageServerHandler {
conf: &'static PageServerConf,
auth: Option<Arc<SwappableJwtAuth>>,
claims: Option<Claims>,
@@ -389,6 +388,7 @@ struct PageServerHandler {
timeline_handles: Option<TimelineHandles>,
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
gate_guard: GateGuard,
}
@@ -642,7 +642,7 @@ impl std::fmt::Display for BatchedPageStreamError {
struct BatchedGetPageRequest {
req: PagestreamGetPageRequest,
timer: SmgrOpTimer,
effective_request_lsn: Lsn,
lsn_range: LsnRange,
ctx: RequestContext,
}
@@ -764,12 +764,12 @@ impl BatchedFeMessage {
match batching_strategy {
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
if let Some(last_in_batch) = accum_pages.last() {
if last_in_batch.effective_request_lsn
!= this_pages[0].effective_request_lsn
if last_in_batch.lsn_range.effective_lsn
!= this_pages[0].lsn_range.effective_lsn
{
trace!(
accum_lsn = %last_in_batch.effective_request_lsn,
this_lsn = %this_pages[0].effective_request_lsn,
accum_lsn = %last_in_batch.lsn_range.effective_lsn,
this_lsn = %this_pages[0].lsn_range.effective_lsn,
"stopping batching because LSN changed"
);
@@ -784,15 +784,15 @@ impl BatchedFeMessage {
let same_page_different_lsn = accum_pages.iter().any(|batched| {
batched.req.rel == this_pages[0].req.rel
&& batched.req.blkno == this_pages[0].req.blkno
&& batched.effective_request_lsn
!= this_pages[0].effective_request_lsn
&& batched.lsn_range.effective_lsn
!= this_pages[0].lsn_range.effective_lsn
});
if same_page_different_lsn {
trace!(
rel=%this_pages[0].req.rel,
blkno=%this_pages[0].req.blkno,
lsn=%this_pages[0].effective_request_lsn,
lsn=%this_pages[0].lsn_range.effective_lsn,
"stopping batching because same page was requested at different LSNs"
);
@@ -844,17 +844,16 @@ impl BatchedFeMessage {
impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
perf_span_fields: ConnectionPerfSpanFields,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
) -> Self {
PageServerHandler {
conf,
auth,
claims: None,
connection_ctx,
@@ -862,6 +861,7 @@ impl PageServerHandler {
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
cancel,
pipelining_config,
get_vectored_concurrent_io,
gate_guard,
}
}
@@ -1158,7 +1158,7 @@ impl PageServerHandler {
.await?;
// We're holding the Handle
let effective_request_lsn = match Self::effective_request_lsn(
let effective_lsn = match Self::effective_request_lsn(
&shard,
shard.get_last_record_lsn(),
req.hdr.request_lsn,
@@ -1177,7 +1177,10 @@ impl PageServerHandler {
pages: smallvec::smallvec![BatchedGetPageRequest {
req,
timer,
effective_request_lsn,
lsn_range: LsnRange {
effective_lsn,
request_lsn: req.hdr.request_lsn
},
ctx,
}],
// The executor grabs the batch when it becomes idle.
@@ -1278,7 +1281,7 @@ impl PageServerHandler {
}
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn pagesteam_handle_batched_message<IO>(
async fn pagestream_handle_batched_message<IO>(
&mut self,
pgb_writer: &mut PostgresBackend<IO>,
batch: BatchedFeMessage,
@@ -1623,7 +1626,7 @@ impl PageServerHandler {
}
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.get_vectored_concurrent_io,
match self.gate_guard.try_clone() {
Ok(guard) => guard,
Err(_) => {
@@ -1733,7 +1736,7 @@ impl PageServerHandler {
};
let result = self
.pagesteam_handle_batched_message(
.pagestream_handle_batched_message(
pgb_writer,
msg,
io_concurrency.clone(),
@@ -1909,7 +1912,7 @@ impl PageServerHandler {
return Err(e);
}
};
self.pagesteam_handle_batched_message(
self.pagestream_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
@@ -2127,7 +2130,14 @@ impl PageServerHandler {
.await?;
let exists = timeline
.get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
.get_rel_exists(
req.rel,
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: req.hdr.request_lsn,
}),
ctx,
)
.await?;
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
@@ -2154,7 +2164,14 @@ impl PageServerHandler {
.await?;
let n_blocks = timeline
.get_rel_size(req.rel, Version::Lsn(lsn), ctx)
.get_rel_size(
req.rel,
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: req.hdr.request_lsn,
}),
ctx,
)
.await?;
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
@@ -2181,7 +2198,15 @@ impl PageServerHandler {
.await?;
let total_blocks = timeline
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
.get_db_size(
DEFAULTTABLESPACE_OID,
req.dbnode,
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: req.hdr.request_lsn,
}),
ctx,
)
.await?;
let db_size = total_blocks as i64 * BLCKSZ as i64;
@@ -2214,7 +2239,7 @@ impl PageServerHandler {
// Ignore error (trace buffer may be full or tracer may have disconnected).
_ = page_trace.try_send(PageTraceEvent {
key,
effective_lsn: batch.effective_request_lsn,
effective_lsn: batch.lsn_range.effective_lsn,
time,
});
}
@@ -2229,7 +2254,7 @@ impl PageServerHandler {
perf_instrument = true;
}
req.effective_request_lsn
req.lsn_range.effective_lsn
})
.max()
.expect("batch is never empty");
@@ -2283,7 +2308,7 @@ impl PageServerHandler {
(
&p.req.rel,
&p.req.blkno,
p.effective_request_lsn,
p.lsn_range,
p.ctx.attached_child(),
)
}),

View File

@@ -43,7 +43,9 @@ use crate::aux_file;
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::{
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
RELSIZE_SNAPSHOT_CACHE_MISSES,
};
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
@@ -90,6 +92,28 @@ pub enum LsnForTimestamp {
NoData(Lsn),
}
/// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
/// See comments libs/pageserver_api/src/models.rs.
/// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
/// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
#[derive(Debug, Clone, Copy, Default)]
pub struct LsnRange {
pub effective_lsn: Lsn,
pub request_lsn: Lsn,
}
impl LsnRange {
pub fn at(lsn: Lsn) -> LsnRange {
LsnRange {
effective_lsn: lsn,
request_lsn: lsn,
}
}
pub fn is_latest(&self) -> bool {
self.request_lsn == Lsn::MAX
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum CalculateLogicalSizeError {
#[error("cancelled")]
@@ -202,13 +226,13 @@ impl Timeline {
io_concurrency: IoConcurrency,
) -> Result<Bytes, PageReconstructError> {
match version {
Version::Lsn(effective_lsn) => {
Version::LsnRange(lsns) => {
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
let res = self
.get_rel_page_at_lsn_batched(
pages.iter().map(|(tag, blknum)| {
(tag, blknum, effective_lsn, ctx.attached_child())
}),
pages
.iter()
.map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
io_concurrency.clone(),
ctx,
)
@@ -246,7 +270,7 @@ impl Timeline {
/// The ordering of the returned vec corresponds to the ordering of `pages`.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, Lsn, RequestContext)>,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> Vec<Result<Bytes, PageReconstructError>> {
@@ -265,7 +289,7 @@ impl Timeline {
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
HashMap::with_capacity(pages.len());
for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() {
for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
@@ -274,7 +298,7 @@ impl Timeline {
slots_filled += 1;
continue;
}
let lsn = lsns.effective_lsn;
let nblocks = {
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
@@ -289,7 +313,7 @@ impl Timeline {
.attached_child();
match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
{
@@ -470,7 +494,7 @@ impl Timeline {
));
}
if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
return Ok(nblocks);
}
@@ -488,7 +512,7 @@ impl Timeline {
let mut buf = version.get(self, key, ctx).await?;
let nblocks = buf.get_u32_le();
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
self.update_cached_rel_size(tag, version, nblocks);
Ok(nblocks)
}
@@ -510,7 +534,7 @@ impl Timeline {
}
// first try to lookup relation in cache
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
return Ok(true);
}
// then check if the database was already initialized.
@@ -586,7 +610,7 @@ impl Timeline {
// scan directory listing (new), merge with the old results
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
@@ -632,7 +656,7 @@ impl Timeline {
) -> Result<Bytes, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let n_blocks = self
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
.get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
.await?;
let keyspace = KeySpace::single(
@@ -645,7 +669,7 @@ impl Timeline {
);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
@@ -867,11 +891,11 @@ impl Timeline {
mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
) -> Result<T, PageReconstructError> {
for segno in self
.list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
.list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
.await?
{
let nblocks = self
.get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
.get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
.await?;
let keyspace = KeySpace::single(
@@ -885,7 +909,7 @@ impl Timeline {
);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
@@ -1137,7 +1161,7 @@ impl Timeline {
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self
.list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
.list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
.await?
{
if self.cancel.is_cancelled() {
@@ -1212,7 +1236,7 @@ impl Timeline {
result.add_key(rel_dir_to_key(spcnode, dbnode));
let mut rels: Vec<RelTag> = self
.list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
.list_rels(spcnode, dbnode, Version::at(lsn), ctx)
.await?
.into_iter()
.collect();
@@ -1329,59 +1353,75 @@ impl Timeline {
Ok((dense_keyspace, sparse_keyspace))
}
/// Get cached size of relation if it not updated after specified LSN
pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
let rel_size_cache = self.rel_size_cache.read().unwrap();
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
if lsn >= *cached_lsn {
RELSIZE_CACHE_HITS.inc();
return Some(*nblocks);
/// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
/// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
/// at the particular LSN (snapshot).
pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
let lsn = version.get_lsn();
{
let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
if lsn >= *cached_lsn {
RELSIZE_LATEST_CACHE_HITS.inc();
return Some(*nblocks);
}
RELSIZE_CACHE_MISSES_OLD.inc();
}
RELSIZE_CACHE_MISSES_OLD.inc();
}
RELSIZE_CACHE_MISSES.inc();
{
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
RELSIZE_SNAPSHOT_CACHE_HITS.inc();
return Some(*nblock);
}
}
if version.is_latest() {
RELSIZE_LATEST_CACHE_MISSES.inc();
} else {
RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
}
None
}
/// Update cached relation size if there is no more recent update
pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if lsn < rel_size_cache.complete_as_of {
// Do not cache old values. It's safe to cache the size on read, as long as
// the read was at an LSN since we started the WAL ingestion. Reasoning: we
// never evict values from the cache, so if the relation size changed after
// 'lsn', the new value is already in the cache.
return;
}
match rel_size_cache.map.entry(tag) {
hash_map::Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
let lsn = version.get_lsn();
if version.is_latest() {
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
match rel_size_cache.entry(tag) {
hash_map::Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
RELSIZE_LATEST_CACHE_ENTRIES.inc();
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
RELSIZE_CACHE_ENTRIES.inc();
} else {
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
if rel_size_cache.capacity() != 0 {
rel_size_cache.insert((lsn, tag), nblocks);
RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
}
}
}
/// Store cached relation size
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
RELSIZE_CACHE_ENTRIES.inc();
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
RELSIZE_LATEST_CACHE_ENTRIES.inc();
}
}
/// Remove cached relation size
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if rel_size_cache.map.remove(tag).is_some() {
RELSIZE_CACHE_ENTRIES.dec();
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
if rel_size_cache.remove(tag).is_some() {
RELSIZE_LATEST_CACHE_ENTRIES.dec();
}
}
}
@@ -1585,7 +1625,10 @@ impl DatadirModification<'_> {
// check the cache too. This is because eagerly checking the cache results in
// less work overall and 10% better performance. It's more work on cache miss
// but cache miss is rare.
if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
if let Some(nblocks) = self
.tline
.get_cached_rel_size(&rel, Version::Modified(self))
{
Ok(nblocks)
} else if !self
.tline
@@ -2667,7 +2710,7 @@ pub struct DatadirModificationStats {
/// timeline to not miss the latest updates.
#[derive(Clone, Copy)]
pub enum Version<'a> {
Lsn(Lsn),
LsnRange(LsnRange),
Modified(&'a DatadirModification<'a>),
}
@@ -2679,7 +2722,7 @@ impl Version<'_> {
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
match self {
Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
Version::Modified(modification) => modification.get(key, ctx).await,
}
}
@@ -2701,12 +2744,26 @@ impl Version<'_> {
}
}
fn get_lsn(&self) -> Lsn {
pub fn is_latest(&self) -> bool {
match self {
Version::Lsn(lsn) => *lsn,
Version::LsnRange(lsns) => lsns.is_latest(),
Version::Modified(_) => true,
}
}
pub fn get_lsn(&self) -> Lsn {
match self {
Version::LsnRange(lsns) => lsns.effective_lsn,
Version::Modified(modification) => modification.lsn,
}
}
pub fn at(lsn: Lsn) -> Self {
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: lsn,
})
}
}
//--- Metadata structs stored in key-value pairs in the repository.

View File

@@ -8596,8 +8596,10 @@ mod tests {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let io_concurrency =
IoConcurrency::spawn_from_conf(tline.conf, tline.gate.enter().unwrap());
let io_concurrency = IoConcurrency::spawn_from_conf(
tline.conf.get_vectored_concurrent_io,
tline.gate.enter().unwrap(),
);
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
let mut res = tline

View File

@@ -31,6 +31,7 @@ pub use inmemory_layer::InMemoryLayer;
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
use pageserver_api::config::GetVectoredConcurrentIo;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
@@ -43,7 +44,6 @@ use self::inmemory_layer::InMemoryLayerFileId;
use super::PageReconstructError;
use super::layer_map::InMemoryLayerDesc;
use super::timeline::{GetVectoredError, ReadPath};
use crate::config::PageServerConf;
use crate::context::{
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
@@ -318,11 +318,10 @@ impl IoConcurrency {
}
pub(crate) fn spawn_from_conf(
conf: &'static PageServerConf,
conf: GetVectoredConcurrentIo,
gate_guard: GateGuard,
) -> IoConcurrency {
use pageserver_api::config::GetVectoredConcurrentIo;
let selected = match conf.get_vectored_concurrent_io {
let selected = match conf {
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
};

View File

@@ -14,6 +14,7 @@ pub mod span;
pub mod uninit;
mod walreceiver;
use hashlink::LruCache;
use std::array;
use std::cmp::{max, min};
use std::collections::btree_map::Entry;
@@ -197,16 +198,6 @@ pub struct TimelineResources {
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
pub(crate) struct RelSizeCache {
pub(crate) complete_as_of: Lsn,
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
}
pub struct Timeline {
pub(crate) conf: &'static PageServerConf,
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
@@ -365,7 +356,8 @@ pub struct Timeline {
pub walreceiver: Mutex<Option<WalReceiver>>,
/// Relation size cache
pub(crate) rel_size_cache: RwLock<RelSizeCache>,
pub(crate) rel_size_latest_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
pub(crate) rel_size_snapshot_cache: Mutex<LruCache<(Lsn, RelTag), BlockNumber>>,
download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
@@ -2820,6 +2812,13 @@ impl Timeline {
self.remote_client.update_config(&new_conf.location);
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
if let Some(new_capacity) = new_conf.tenant_conf.relsize_snapshot_cache_capacity {
if new_capacity != rel_size_cache.capacity() {
rel_size_cache.set_capacity(new_capacity);
}
}
self.metrics
.evictions_with_low_residence_duration
.write()
@@ -2878,6 +2877,14 @@ impl Timeline {
ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn(), is_offloaded);
}
let relsize_snapshot_cache_capacity = {
let loaded_tenant_conf = tenant_conf.load();
loaded_tenant_conf
.tenant_conf
.relsize_snapshot_cache_capacity
.unwrap_or(conf.default_tenant_conf.relsize_snapshot_cache_capacity)
};
Arc::new_cyclic(|myself| {
let metrics = Arc::new(TimelineMetrics::new(
&tenant_shard_id,
@@ -2969,10 +2976,8 @@ impl Timeline {
last_image_layer_creation_check_instant: Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(RelSizeCache {
complete_as_of: disk_consistent_lsn,
map: HashMap::new(),
}),
rel_size_latest_cache: RwLock::new(HashMap::new()),
rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)),
download_all_remote_layers_task_info: RwLock::new(None),
@@ -3530,7 +3535,7 @@ impl Timeline {
};
let io_concurrency = IoConcurrency::spawn_from_conf(
self_ref.conf,
self_ref.conf.get_vectored_concurrent_io,
self_ref
.gate
.enter()
@@ -5559,7 +5564,7 @@ impl Timeline {
});
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| CreateImageLayersError::Cancelled)?,

View File

@@ -188,7 +188,7 @@ pub(crate) async fn generate_tombstone_image_layer(
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
);
let io_concurrency = IoConcurrency::spawn_from_conf(
detached.conf,
detached.conf.get_vectored_concurrent_io,
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
);
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);

View File

@@ -1684,31 +1684,31 @@ mod tests {
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await?,
false
);
assert!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await
.is_err()
);
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
1
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
.await?,
3
);
@@ -1719,7 +1719,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x20)),
Version::at(Lsn(0x20)),
&ctx,
io_concurrency.clone()
)
@@ -1733,7 +1733,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x30)),
Version::at(Lsn(0x30)),
&ctx,
io_concurrency.clone()
)
@@ -1747,7 +1747,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x40)),
Version::at(Lsn(0x40)),
&ctx,
io_concurrency.clone()
)
@@ -1760,7 +1760,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x40)),
Version::at(Lsn(0x40)),
&ctx,
io_concurrency.clone()
)
@@ -1774,7 +1774,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1787,7 +1787,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1800,7 +1800,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
2,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1820,7 +1820,7 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
.await?,
2
);
@@ -1829,7 +1829,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x60)),
Version::at(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
@@ -1842,7 +1842,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x60)),
Version::at(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
@@ -1854,7 +1854,7 @@ mod tests {
// should still see the truncated block with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
.await?,
3
);
@@ -1863,7 +1863,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
2,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1880,7 +1880,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x68)), &ctx)
.await?,
0
);
@@ -1893,7 +1893,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x70)), &ctx)
.await?,
2
);
@@ -1902,7 +1902,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x70)),
Version::at(Lsn(0x70)),
&ctx,
io_concurrency.clone()
)
@@ -1915,7 +1915,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x70)),
Version::at(Lsn(0x70)),
&ctx,
io_concurrency.clone()
)
@@ -1932,7 +1932,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
.await?,
1501
);
@@ -1942,7 +1942,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blk,
Version::Lsn(Lsn(0x80)),
Version::at(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
@@ -1956,7 +1956,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1500,
Version::Lsn(Lsn(0x80)),
Version::at(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
@@ -1990,13 +1990,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
1
);
@@ -2011,7 +2011,7 @@ mod tests {
// Check that rel is not visible anymore
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x30)), &ctx)
.await?,
false
);
@@ -2029,13 +2029,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
.await?,
1
);
@@ -2077,26 +2077,26 @@ mod tests {
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await?,
false
);
assert!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await
.is_err()
);
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
relsize
);
@@ -2110,7 +2110,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(lsn),
Version::at(lsn),
&ctx,
io_concurrency.clone()
)
@@ -2131,7 +2131,7 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
.await?,
1
);
@@ -2144,7 +2144,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x60)),
Version::at(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
@@ -2157,7 +2157,7 @@ mod tests {
// should still see all blocks with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
.await?,
relsize
);
@@ -2169,7 +2169,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -2193,13 +2193,13 @@ mod tests {
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
.await?,
relsize
);
@@ -2212,7 +2212,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x80)),
Version::at(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
@@ -2250,7 +2250,7 @@ mod tests {
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE + 1
);
@@ -2264,7 +2264,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE
);
@@ -2279,7 +2279,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE - 1
);
@@ -2297,7 +2297,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
size as BlockNumber
);

View File

@@ -86,7 +86,7 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
#define InvalidRelFileNumber InvalidOid
#define SMgrRelGetRelInfo(reln) \
#define SMgrRelGetRelInfo(reln) \
(reln->smgr_rnode.node)
#define DropRelationAllLocalBuffers DropRelFileNodeAllLocalBuffers
@@ -148,6 +148,12 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
#define DropRelationAllLocalBuffers DropRelationAllLocalBuffers
#endif
#define NRelFileInfoInvalidate(rinfo) do { \
NInfoGetSpcOid(rinfo) = InvalidOid; \
NInfoGetDbOid(rinfo) = InvalidOid; \
NInfoGetRelNumber(rinfo) = InvalidRelFileNumber; \
} while (0)
#if PG_MAJORVERSION_NUM < 17
#define ProcNumber BackendId
#define INVALID_PROC_NUMBER InvalidBackendId

View File

@@ -108,7 +108,7 @@ typedef enum
UNLOGGED_BUILD_NOT_PERMANENT
} UnloggedBuildPhase;
static SMgrRelation unlogged_build_rel = NULL;
static NRelFileInfo unlogged_build_rel_info;
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
@@ -912,8 +912,14 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{
case 0:
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdextend(reln, forkNum, blkno, buffer, skipFsync);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1000,8 +1006,14 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
{
case 0:
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1376,8 +1388,14 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
{
case 0:
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdread(reln, forkNum, blkno, buffer);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1463,8 +1481,14 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
{
case 0:
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdreadv(reln, forknum, blocknum, buffers, nblocks);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1597,6 +1621,15 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
#if PG_MAJORVERSION_NUM >= 17
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
#else
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
#endif
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1666,6 +1699,11 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1706,6 +1744,10 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
return mdnblocks(reln, forknum);
}
break;
case RELPERSISTENCE_TEMP:
@@ -1775,6 +1817,11 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdtruncate(reln, forknum, old_blocks, nblocks);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1913,7 +1960,6 @@ neon_start_unlogged_build(SMgrRelation reln)
*/
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
neon_log(ERROR, "unlogged relation build is already in progress");
Assert(unlogged_build_rel == NULL);
ereport(SmgrTrace,
(errmsg(NEON_TAG "starting unlogged build of relation %u/%u/%u",
@@ -1930,7 +1976,7 @@ neon_start_unlogged_build(SMgrRelation reln)
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
unlogged_build_rel = reln;
unlogged_build_rel_info = InfoFromSMgrRel(reln);
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
#ifdef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
@@ -1951,12 +1997,9 @@ neon_start_unlogged_build(SMgrRelation reln)
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
#endif
unlogged_build_rel = reln;
unlogged_build_rel_info = InfoFromSMgrRel(reln);
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
/* Make the relation look like it's unlogged */
reln->smgr_relpersistence = RELPERSISTENCE_UNLOGGED;
/*
* Create the local file. In a parallel build, the leader is expected to
* call this first and do it.
@@ -1983,17 +2026,16 @@ neon_start_unlogged_build(SMgrRelation reln)
static void
neon_finish_unlogged_build_phase_1(SMgrRelation reln)
{
Assert(unlogged_build_rel == reln);
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
ereport(SmgrTrace,
(errmsg(NEON_TAG "finishing phase 1 of unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromSMgrRel(reln)))));
RelFileInfoFmt((unlogged_build_rel_info)))));
if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT)
return;
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_1);
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
/*
* In a parallel build, (only) the leader process performs the 2nd
@@ -2001,7 +2043,7 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
*/
if (IsParallelWorker())
{
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}
else
@@ -2022,11 +2064,11 @@ neon_end_unlogged_build(SMgrRelation reln)
{
NRelFileInfoBackend rinfob = InfoBFromSMgrRel(reln);
Assert(unlogged_build_rel == reln);
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
ereport(SmgrTrace,
(errmsg(NEON_TAG "ending unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromNInfoB(rinfob)))));
RelFileInfoFmt(unlogged_build_rel_info))));
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
{
@@ -2034,7 +2076,6 @@ neon_end_unlogged_build(SMgrRelation reln)
BlockNumber nblocks;
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_2);
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
/*
* Update the last-written LSN cache.
@@ -2055,9 +2096,6 @@ neon_end_unlogged_build(SMgrRelation reln)
InfoFromNInfoB(rinfob),
MAIN_FORKNUM);
/* Make the relation look permanent again */
reln->smgr_relpersistence = RELPERSISTENCE_PERMANENT;
/* Remove local copy */
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{
@@ -2078,7 +2116,7 @@ neon_end_unlogged_build(SMgrRelation reln)
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}
@@ -2151,7 +2189,7 @@ AtEOXact_neon(XactEvent event, void *arg)
* Forget about any build we might have had in progress. The local
* file will be unlinked by smgrDoPendingDeletes()
*/
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
break;
@@ -2163,7 +2201,7 @@ AtEOXact_neon(XactEvent event, void *arg)
case XACT_EVENT_PRE_PREPARE:
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
{
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),

15
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -1145,18 +1145,19 @@ dotenv = ["python-dotenv"]
[[package]]
name = "flask-cors"
version = "5.0.0"
description = "A Flask extension adding a decorator for CORS support"
version = "6.0.0"
description = "A Flask extension simplifying CORS support"
optional = false
python-versions = "*"
python-versions = "<4.0,>=3.9"
groups = ["main"]
files = [
{file = "Flask_Cors-5.0.0-py2.py3-none-any.whl", hash = "sha256:b9e307d082a9261c100d8fb0ba909eec6a228ed1b60a8315fd85f783d61910bc"},
{file = "flask_cors-5.0.0.tar.gz", hash = "sha256:5aadb4b950c4e93745034594d9f3ea6591f734bb3662e16e255ffbf5e89c88ef"},
{file = "flask_cors-6.0.0-py3-none-any.whl", hash = "sha256:6332073356452343a8ccddbfec7befdc3fdd040141fe776ec9b94c262f058657"},
{file = "flask_cors-6.0.0.tar.gz", hash = "sha256:4592c1570246bf7beee96b74bc0adbbfcb1b0318f6ba05c412e8909eceec3393"},
]
[package.dependencies]
Flask = ">=0.9"
flask = ">=0.9"
Werkzeug = ">=0.7"
[[package]]
name = "frozenlist"

View File

@@ -73,7 +73,6 @@ rustc-hash.workspace = true
rustls.workspace = true
rustls-native-certs.workspace = true
rustls-pemfile.workspace = true
ryu = "1"
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -536,7 +536,8 @@ mod tests {
use control_plane::AuthSecret;
use fallible_iterator::FallibleIterator;
use once_cell::sync::Lazy;
use postgres_protocol::authentication::sasl::{ChannelBinding, ScramSha256};
use postgres_protocol::CSafeStr;
use postgres_protocol::authentication::sasl::{ChannelBinding, SCRAM_SHA_256, ScramSha256};
use postgres_protocol::message::backend::Message as PgMessage;
use postgres_protocol::message::frontend;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
@@ -714,15 +715,15 @@ mod tests {
// server should offer scram
match read_message(&mut client, &mut read).await {
PgMessage::AuthenticationSasl(a) => {
let options: Vec<&str> = a.mechanisms().collect().unwrap();
assert_eq!(options, ["SCRAM-SHA-256"]);
let options: Vec<&CSafeStr> = a.mechanisms().collect().unwrap();
assert_eq!(options, [SCRAM_SHA_256]);
}
_ => panic!("wrong message"),
}
// client sends client-first-message
let mut write = BytesMut::new();
frontend::sasl_initial_response("SCRAM-SHA-256", scram.message(), &mut write).unwrap();
frontend::sasl_initial_response(SCRAM_SHA_256, scram.message(), &mut write);
client.write_all(&write).await.unwrap();
// server response with server-first-message
@@ -735,7 +736,7 @@ mod tests {
// client response with client-final-message
write.clear();
frontend::sasl_response(scram.message(), &mut write).unwrap();
frontend::sasl_response(scram.message(), &mut write);
client.write_all(&write).await.unwrap();
// server response with server-final-message
@@ -800,7 +801,7 @@ mod tests {
// client responds with password
write.clear();
frontend::password_message(b"my-secret-password", &mut write).unwrap();
frontend::password_message(c"my-secret-password".into(), &mut write);
client.write_all(&write).await.unwrap();
});
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards(
@@ -853,8 +854,10 @@ mod tests {
// client responds with password
let mut write = BytesMut::new();
frontend::password_message(b"endpoint=my-endpoint;my-secret-password", &mut write)
.unwrap();
frontend::password_message(
c"endpoint=my-endpoint;my-secret-password".into(),
&mut write,
);
client.write_all(&write).await.unwrap();
});

View File

@@ -3,7 +3,6 @@
use std::io;
use std::sync::Arc;
use postgres_protocol::authentication::sasl::{SCRAM_SHA_256, SCRAM_SHA_256_PLUS};
use pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
@@ -174,8 +173,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
}
match sasl.method {
SCRAM_SHA_256 => ctx.set_auth_method(crate::context::AuthMethod::ScramSha256),
SCRAM_SHA_256_PLUS => ctx.set_auth_method(crate::context::AuthMethod::ScramSha256Plus),
scram::SCRAM_SHA_256 => ctx.set_auth_method(crate::context::AuthMethod::ScramSha256),
scram::SCRAM_SHA_256_PLUS => {
ctx.set_auth_method(crate::context::AuthMethod::ScramSha256Plus)
}
_ => {}
}

View File

@@ -161,8 +161,11 @@ struct ProxyCliArgs {
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)]
redis_rps_limit: Vec<RateBucketInfo>,
/// Cancellation channel size (max queue size for redis kv client)
#[clap(long, default_value = "1024")]
#[clap(long, default_value_t = 1024)]
cancellation_ch_size: usize,
/// Cancellation ops batch size for redis
#[clap(long, default_value_t = 8)]
cancellation_batch_size: usize,
/// cache for `allowed_ips` (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
allowed_ips_cache: String,
@@ -542,7 +545,12 @@ pub async fn run() -> anyhow::Result<()> {
if let Some(mut redis_kv_client) = redis_kv_client {
maintenance_tasks.spawn(async move {
redis_kv_client.try_connect().await?;
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await?;
handle_cancel_messages(
&mut redis_kv_client,
rx_cancel,
args.cancellation_batch_size,
)
.await?;
drop(redis_kv_client);

View File

@@ -30,8 +30,6 @@ use crate::tls::postgres_rustls::MakeRustlsConnect;
type IpSubnetKey = IpNet;
const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time
const REDIS_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10);
const BATCH_SIZE: usize = 8;
// Message types for sending through mpsc channel
pub enum CancelKeyOp {
@@ -231,12 +229,13 @@ impl CancelReplyOp {
pub async fn handle_cancel_messages(
client: &mut RedisKVClient,
mut rx: mpsc::Receiver<CancelKeyOp>,
batch_size: usize,
) -> anyhow::Result<()> {
let mut batch = Vec::with_capacity(BATCH_SIZE);
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
let mut batch = Vec::with_capacity(batch_size);
let mut pipeline = Pipeline::with_capacity(batch_size);
loop {
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
if rx.recv_many(&mut batch, batch_size).await == 0 {
warn!("shutting down cancellation queue");
break Ok(());
}
@@ -367,8 +366,7 @@ impl CancellationHandler {
return Err(CancelError::InternalError);
};
tx.send_timeout(op, REDIS_SEND_TIMEOUT)
.await
tx.try_send(op)
.map_err(|e| {
tracing::warn!("failed to send GetCancelData for {key}: {e}");
})
@@ -570,7 +568,7 @@ impl Session {
}
// Send the store key op to the cancellation handler and set TTL for the key
pub(crate) async fn write_cancel_key(
pub(crate) fn write_cancel_key(
&self,
cancel_closure: CancelClosure,
) -> Result<(), CancelError> {
@@ -596,14 +594,14 @@ impl Session {
expire: CANCEL_KEY_TTL,
};
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
let _ = tx.try_send(op).map_err(|e| {
let key = self.key;
tracing::warn!("failed to send StoreCancelKey for {key}: {e}");
});
Ok(())
}
pub(crate) async fn remove_cancel_key(&self) -> Result<(), CancelError> {
pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> {
let Some(tx) = &self.cancellation_handler.tx else {
tracing::warn!("cancellation handler is not available");
return Err(CancelError::InternalError);
@@ -619,7 +617,7 @@ impl Session {
.guard(RedisMsgKind::HDel),
};
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
let _ = tx.try_send(op).map_err(|e| {
let key = self.key;
tracing::warn!("failed to send RemoveCancelKey for {key}: {e}");
});

View File

@@ -244,9 +244,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session = cancellation_handler_clone.get_key();
session
.write_cancel_key(node.cancel_closure.clone())
.await?;
session.write_cancel_key(node.cancel_closure.clone())?;
prepare_client_connection(&node, *session.key(), &mut stream).await?;

File diff suppressed because it is too large Load Diff

View File

@@ -1,356 +0,0 @@
//! Vendoring of serde_json's string escaping code.
//!
//! <https://github.com/serde-rs/json/blob/c1826ebcccb1a520389c6b78ad3da15db279220d/src/ser.rs#L1514-L1552>
//! <https://github.com/serde-rs/json/blob/c1826ebcccb1a520389c6b78ad3da15db279220d/src/ser.rs#L2081-L2157>
//! Licensed by David Tolnay under MIT or Apache-2.0.
//!
//! With modifications by Conrad Ludgate on behalf of Neon.
use std::fmt::{self, Write};
use serde_json::ser::CharEscape;
#[must_use]
pub struct ValueSer<'buf> {
buf: &'buf mut Vec<u8>,
}
impl<'buf> ValueSer<'buf> {
pub fn new(buf: &'buf mut Vec<u8>) -> Self {
Self { buf }
}
#[inline]
pub fn serialize(self, value: &SerializedValue) {
self.buf.extend_from_slice(&value.0);
}
#[inline]
pub fn str(self, s: &str) {
format_escaped_str(self.buf, s);
}
#[inline]
pub fn str_args(self, s: fmt::Arguments) {
format_escaped_display(self.buf, s);
}
#[inline]
pub fn bytes_hex(self, s: &[u8]) {
self.str_args(format_args!("{s:x?}"));
}
#[inline]
pub fn int(self, x: impl itoa::Integer) {
write_int(x, self.buf);
}
#[inline]
pub fn float(self, x: impl ryu::Float) {
write_float(x, self.buf);
}
#[inline]
pub fn bool(self, x: bool) {
let bool = if x { "true" } else { "false" };
self.buf.extend_from_slice(bool.as_bytes());
}
#[inline]
pub fn map(self) -> MapSer<'buf> {
MapSer::new(self.buf)
}
#[inline]
#[expect(unused)]
pub fn list(self) -> ListSer<'buf> {
ListSer::new(self.buf)
}
}
pub struct MapSer<'buf> {
buf: &'buf mut Vec<u8>,
first: bool,
}
impl<'buf> MapSer<'buf> {
#[inline]
fn new(buf: &'buf mut Vec<u8>) -> Self {
buf.push(b'{');
Self { buf, first: true }
}
#[inline]
pub fn entry(&mut self, key: Escaped) -> ValueSer {
self.entry_inner(|b| key.write(b))
}
#[inline]
pub fn entry_escape(&mut self, key: &str) -> ValueSer {
self.entry_inner(|b| format_escaped_str(b, key))
}
#[inline]
pub fn entry_escape_args(&mut self, key: fmt::Arguments) -> ValueSer {
self.entry_inner(|b| format_escaped_display(b, key))
}
#[inline]
fn entry_inner(&mut self, f: impl FnOnce(&mut Vec<u8>)) -> ValueSer {
if !self.first {
self.buf.push(b',');
}
self.first = false;
f(self.buf);
self.buf.push(b':');
ValueSer { buf: self.buf }
}
}
impl Drop for MapSer<'_> {
fn drop(&mut self) {
self.buf.push(b'}');
}
}
pub struct ListSer<'buf> {
buf: &'buf mut Vec<u8>,
first: bool,
}
impl<'buf> ListSer<'buf> {
#[inline]
fn new(buf: &'buf mut Vec<u8>) -> Self {
buf.push(b'[');
Self { buf, first: true }
}
#[expect(unused)]
#[inline]
fn entry(&mut self) -> ValueSer {
if !self.first {
self.buf.push(b',');
}
self.first = false;
ValueSer { buf: self.buf }
}
}
impl Drop for ListSer<'_> {
fn drop(&mut self) {
self.buf.push(b']');
}
}
#[derive(Clone)]
pub struct SerializedValue(Box<[u8]>);
impl SerializedValue {
#[inline]
pub fn str(s: &str) -> Self {
let mut v = vec![];
v.reserve_exact(2 + s.len());
format_escaped_str(&mut v, s);
Self(v.into_boxed_slice())
}
#[inline]
pub fn str_args(s: fmt::Arguments) -> Self {
if let Some(s) = s.as_str() {
return Self::str(s);
}
let mut v = vec![];
format_escaped_display(&mut v, s);
Self(v.into_boxed_slice())
}
#[inline]
pub fn bytes_hex(s: &[u8]) -> Self {
Self::str_args(format_args!("{s:x?}"))
}
#[inline]
pub fn int(x: impl itoa::Integer) -> Self {
Self(itoa::Buffer::new().format(x).as_bytes().into())
}
#[inline]
pub fn float(x: impl ryu::Float) -> Self {
Self(ryu::Buffer::new().format(x).as_bytes().into())
}
#[inline]
pub fn bool(x: bool) -> Self {
let bool = if x { "true" } else { "false" };
Self(bool.as_bytes().into())
}
}
/// Represents a string that didn't need escaping because it's already valid json string.
#[derive(Clone, Copy)]
pub struct Escaped(&'static str);
impl Escaped {
pub const fn new(s: &'static str) -> Self {
let mut i = 0;
while i < s.len() {
let escape = ESCAPE[s.as_bytes()[i] as usize];
i += 1;
assert!(escape == 0, "const json string should not need escaping");
}
Self(s)
}
pub fn as_str(self) -> &'static str {
self.0
}
fn write(self, buf: &mut Vec<u8>) {
buf.push(b'"');
buf.extend_from_slice(self.0.as_bytes());
buf.push(b'"');
}
}
fn write_int(x: impl itoa::Integer, b: &mut Vec<u8>) {
b.extend_from_slice(itoa::Buffer::new().format(x).as_bytes());
}
fn write_float(x: impl ryu::Float, b: &mut Vec<u8>) {
b.extend_from_slice(ryu::Buffer::new().format(x).as_bytes());
}
#[inline]
fn char_escape_from_escape_table(escape: u8, byte: u8) -> CharEscape {
match escape {
self::BB => CharEscape::Backspace,
self::TT => CharEscape::Tab,
self::NN => CharEscape::LineFeed,
self::FF => CharEscape::FormFeed,
self::RR => CharEscape::CarriageReturn,
self::QU => CharEscape::Quote,
self::BS => CharEscape::ReverseSolidus,
self::UU => CharEscape::AsciiControl(byte),
_ => unreachable!(),
}
}
fn format_escaped_str(writer: &mut Vec<u8>, value: &str) {
writer.push(b'"');
let rest = format_escaped_str_contents(writer, value);
writer.extend_from_slice(rest);
writer.push(b'"');
}
fn format_escaped_display(writer: &mut Vec<u8>, args: fmt::Arguments) {
writer.push(b'"');
if let Some(s) = args.as_str() {
let rest = format_escaped_str_contents(writer, s);
writer.extend_from_slice(rest);
} else {
Collect { buf: writer }
.write_fmt(args)
.expect("formatting should not error");
}
writer.push(b'"');
}
struct Collect<'buf> {
buf: &'buf mut Vec<u8>,
}
impl fmt::Write for Collect<'_> {
fn write_str(&mut self, s: &str) -> fmt::Result {
let last = format_escaped_str_contents(self.buf, s);
self.buf.extend(last);
Ok(())
}
}
// writes any escape sequences, and returns the suffix still needed to be written.
fn format_escaped_str_contents<'a>(writer: &mut Vec<u8>, value: &'a str) -> &'a [u8] {
let bytes = value.as_bytes();
let mut start = 0;
for (i, &byte) in bytes.iter().enumerate() {
let escape = ESCAPE[byte as usize];
if escape == 0 {
continue;
}
writer.extend_from_slice(&bytes[start..i]);
let char_escape = char_escape_from_escape_table(escape, byte);
write_char_escape(writer, char_escape);
start = i + 1;
}
&bytes[start..]
}
const BB: u8 = b'b'; // \x08
const TT: u8 = b't'; // \x09
const NN: u8 = b'n'; // \x0A
const FF: u8 = b'f'; // \x0C
const RR: u8 = b'r'; // \x0D
const QU: u8 = b'"'; // \x22
const BS: u8 = b'\\'; // \x5C
const UU: u8 = b'u'; // \x00...\x1F except the ones above
const __: u8 = 0;
// Lookup table of escape sequences. A value of b'x' at index i means that byte
// i is escaped as "\x" in JSON. A value of 0 means that byte i is not escaped.
static ESCAPE: [u8; 256] = [
// 1 2 3 4 5 6 7 8 9 A B C D E F
UU, UU, UU, UU, UU, UU, UU, UU, BB, TT, NN, UU, FF, RR, UU, UU, // 0
UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, // 1
__, __, QU, __, __, __, __, __, __, __, __, __, __, __, __, __, // 2
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 3
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 4
__, __, __, __, __, __, __, __, __, __, __, __, BS, __, __, __, // 5
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 6
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 7
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 8
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 9
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // A
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // B
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // C
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // D
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // E
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // F
];
fn write_char_escape(writer: &mut Vec<u8>, char_escape: CharEscape) {
let s = match char_escape {
CharEscape::Quote => b"\\\"",
CharEscape::ReverseSolidus => b"\\\\",
CharEscape::Solidus => b"\\/",
CharEscape::Backspace => b"\\b",
CharEscape::FormFeed => b"\\f",
CharEscape::LineFeed => b"\\n",
CharEscape::CarriageReturn => b"\\r",
CharEscape::Tab => b"\\t",
CharEscape::AsciiControl(byte) => {
static HEX_DIGITS: [u8; 16] = *b"0123456789abcdef";
let bytes = &[
b'\\',
b'u',
b'0',
b'0',
HEX_DIGITS[(byte >> 4) as usize],
HEX_DIGITS[(byte & 0xF) as usize],
];
return writer.extend_from_slice(bytes);
}
};
writer.extend_from_slice(s);
}

View File

@@ -67,6 +67,7 @@ where
}
}
#[tracing::instrument(skip_all)]
pub async fn copy_bidirectional_client_compute<Client, Compute>(
client: &mut Client,
compute: &mut Compute,

View File

@@ -383,9 +383,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session = cancellation_handler_clone.get_key();
session
.write_cancel_key(node.cancel_closure.clone())
.await?;
session.write_cancel_key(node.cancel_closure.clone())?;
prepare_client_connection(&node, *session.key(), &mut stream).await?;

View File

@@ -13,6 +13,7 @@ use crate::stream::Stream;
use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS};
/// Forward bytes in both directions (client <-> compute).
#[tracing::instrument(skip_all)]
pub(crate) async fn proxy_pass(
client: impl AsyncRead + AsyncWrite + Unpin,
compute: impl AsyncRead + AsyncWrite + Unpin,
@@ -93,7 +94,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
}
drop(self.cancel.remove_cancel_key().await); // we don't need a result. If the queue is full, we just log the error
drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error
res
}

View File

@@ -9,7 +9,7 @@ use std::fmt::Debug;
use bytes::{Bytes, BytesMut};
use futures::{SinkExt, StreamExt};
use postgres_client::tls::TlsConnect;
use postgres_protocol::message::frontend;
use postgres_protocol::{authentication::sasl::SCRAM_SHA_256, message::frontend};
use tokio::io::{AsyncReadExt, DuplexStream};
use tokio_util::codec::{Decoder, Encoder};
@@ -60,8 +60,7 @@ async fn proxy_mitm(
params: startup.params.into(),
},
&mut buf,
)
.unwrap();
);
end_server.send(buf.freeze()).await.unwrap();
// proxy messages between end_client and end_server
@@ -90,7 +89,7 @@ async fn proxy_mitm(
new_message.extend_from_slice(sasl_message.strip_prefix(b"p=tls-server-end-point,,").unwrap());
let mut buf = BytesMut::new();
frontend::sasl_initial_response("SCRAM-SHA-256", &new_message, &mut buf).unwrap();
frontend::sasl_initial_response(SCRAM_SHA_256, &new_message, &mut buf);
end_server.send(buf.freeze()).await.unwrap();
continue;

View File

@@ -21,8 +21,8 @@ pub(crate) use key::ScramKey;
pub(crate) use secret::ServerSecret;
use sha2::{Digest, Sha256};
const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
const SCRAM_SHA_256_PLUS: &str = "SCRAM-SHA-256-PLUS";
pub(crate) const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
pub(crate) const SCRAM_SHA_256_PLUS: &str = "SCRAM-SHA-256-PLUS";
/// A list of supported SCRAM methods.
pub(crate) const METHODS: &[&str] = &[SCRAM_SHA_256_PLUS, SCRAM_SHA_256];

View File

@@ -13,22 +13,19 @@ pub(crate) struct Pbkdf2 {
// inspired from <https://github.com/neondatabase/rust-postgres/blob/20031d7a9ee1addeae6e0968e3899ae6bf01cee2/postgres-protocol/src/authentication/sasl.rs#L36-L61>
impl Pbkdf2 {
pub(crate) fn start(str: &[u8], salt: &[u8], iterations: u32) -> Self {
let hmac =
// key the HMAC and derive the first block in-place
let mut hmac =
Hmac::<Sha256>::new_from_slice(str).expect("HMAC is able to accept all key sizes");
let prev = hmac
.clone()
.chain_update(salt)
.chain_update(1u32.to_be_bytes())
.finalize()
.into_bytes();
hmac.update(salt);
hmac.update(&1u32.to_be_bytes());
let init_block = hmac.finalize_reset().into_bytes();
Self {
hmac,
// one consumed for the hash above
// one iteration spent above
iterations: iterations - 1,
hi: prev,
prev,
hi: init_block,
prev: init_block,
}
}
@@ -44,14 +41,17 @@ impl Pbkdf2 {
iterations,
} = self;
// only do 4096 iterations per turn before sharing the thread for fairness
// only do up to 4096 iterations per turn for fairness
let n = (*iterations).clamp(0, 4096);
for _ in 0..n {
*prev = hmac.clone().chain_update(*prev).finalize().into_bytes();
hmac.update(prev);
let block = hmac.finalize_reset().into_bytes();
for (hi, prev) in hi.iter_mut().zip(*prev) {
*hi ^= prev;
for (hi_byte, &b) in hi.iter_mut().zip(block.iter()) {
*hi_byte ^= b;
}
*prev = block;
}
*iterations -= n;

View File

@@ -103,7 +103,7 @@ class AbstractNeonCli:
else:
stdout = ""
log.warn(f"CLI timeout: stderr={stderr}, stdout={stdout}")
log.warning(f"CLI timeout: stderr={stderr}, stdout={stdout}")
raise
indent = " "

View File

@@ -187,6 +187,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
},
"rel_size_v2_enabled": True,
"relsize_snapshot_cache_capacity": 10000,
"gc_compaction_enabled": True,
"gc_compaction_verification": False,
"gc_compaction_initial_threshold_kb": 1024000,

View File

@@ -19,6 +19,16 @@ TEST_ROLE_NAMES = [
{"name": "role$"},
{"name": "role$$"},
{"name": "role$x$"},
{"name": "x"},
{"name": "xx"},
{"name": "$x"},
{"name": "x$"},
{"name": "$x$"},
{"name": "xx$"},
{"name": "$xx"},
{"name": "$xx$"},
# 63 bytes is the limit for role/DB names in Postgres
{"name": "x" * 63},
]
TEST_DB_NAMES = [
@@ -74,6 +84,43 @@ TEST_DB_NAMES = [
"name": "db name$x$",
"owner": "role$x$",
},
{
"name": "x",
"owner": "x",
},
{
"name": "xx",
"owner": "xx",
},
{
"name": "$x",
"owner": "$x",
},
{
"name": "x$",
"owner": "x$",
},
{
"name": "$x$",
"owner": "$x$",
},
{
"name": "xx$",
"owner": "xx$",
},
{
"name": "$xx",
"owner": "$xx",
},
{
"name": "$xx$",
"owner": "$xx$",
},
# 63 bytes is the limit for role/DB names in Postgres
{
"name": "x" * 63,
"owner": "x" * 63,
},
]
@@ -146,6 +193,10 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can create and work with databases and roles
with special characters (whitespaces, %, tabs, etc.) in the name.
Also use `drop_subscriptions_before_start: true`. We do not actually
have any subscriptions in this test, so it should be no-op, but it
i) simulates the case when we create a second dev branch together with
a new project creation, and ii) just generally stresses more code paths.
"""
env = neon_simple_env
@@ -159,6 +210,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
**{
"spec": {
"skip_pg_catalog_updates": False,
"drop_subscriptions_before_start": True,
"cluster": {
"roles": TEST_ROLE_NAMES,
"databases": TEST_DB_NAMES,
@@ -202,6 +254,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
**{
"spec": {
"skip_pg_catalog_updates": False,
"drop_subscriptions_before_start": True,
"cluster": {
"roles": [],
"databases": [],

View File

@@ -510,7 +510,7 @@ def list_elegible_layers(
except KeyError:
# Unexpected: tests should call this when pageservers are in a quiet state such that the layer map
# matches what's on disk.
log.warn(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
log.warning(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
raise
return list(c for c in candidates if is_visible(c))
@@ -636,7 +636,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
except:
# On assertion failures, log some details to help with debugging
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
log.warn(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
log.warning(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
raise
# Scrub the remote storage

View File

@@ -27,8 +27,9 @@ from contextlib import closing
import psycopg2
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.neon_fixtures import NeonEnv, PgBin, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar, skip_on_postgres, wait_until
@@ -695,3 +696,110 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv):
with secondary.cursor() as secondary_cur:
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (n_restarts,)
def test_ephemeral_endpoints_vacuum(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
sql = """
CREATE TABLE CHAR_TBL(f1 char(4));
CREATE TABLE FLOAT8_TBL(f1 float8);
CREATE TABLE INT2_TBL(f1 int2);
CREATE TABLE INT4_TBL(f1 int4);
CREATE TABLE INT8_TBL(q1 int8, q2 int8);
CREATE TABLE POINT_TBL(f1 point);
CREATE TABLE TEXT_TBL (f1 text);
CREATE TABLE VARCHAR_TBL(f1 varchar(4));
CREATE TABLE onek (unique1 int4);
CREATE TABLE onek2 AS SELECT * FROM onek;
CREATE TABLE tenk1 (unique1 int4);
CREATE TABLE tenk2 AS SELECT * FROM tenk1;
CREATE TABLE person (name text, age int4,location point);
CREATE TABLE emp (salary int4, manager name) INHERITS (person);
CREATE TABLE student (gpa float8) INHERITS (person);
CREATE TABLE stud_emp ( percent int4) INHERITS (emp, student);
CREATE TABLE road (name text,thepath path);
CREATE TABLE ihighway () INHERITS (road);
CREATE TABLE shighway(surface text) INHERITS (road);
CREATE TABLE BOOLTBL3 (d text, b bool, o int);
CREATE TABLE booltbl4(isfalse bool, istrue bool, isnul bool);
DROP TABLE BOOLTBL3;
DROP TABLE BOOLTBL4;
CREATE TABLE ceil_floor_round (a numeric);
DROP TABLE ceil_floor_round;
CREATE TABLE width_bucket_test (operand_num numeric, operand_f8 float8);
DROP TABLE width_bucket_test;
CREATE TABLE num_input_test (n1 numeric);
CREATE TABLE num_variance (a numeric);
INSERT INTO num_variance VALUES (0);
CREATE TABLE snapshot_test (nr integer, snap txid_snapshot);
CREATE TABLE guid1(guid_field UUID, text_field TEXT DEFAULT(now()));
CREATE TABLE guid2(guid_field UUID, text_field TEXT DEFAULT(now()));
CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field);
CREATE INDEX guid1_hash ON guid1 USING HASH (guid_field);
TRUNCATE guid1;
DROP TABLE guid1;
DROP TABLE guid2 CASCADE;
CREATE TABLE numrange_test (nr NUMRANGE);
CREATE INDEX numrange_test_btree on numrange_test(nr);
CREATE TABLE numrange_test2(nr numrange);
CREATE INDEX numrange_test2_hash_idx on numrange_test2 using hash (nr);
INSERT INTO numrange_test2 VALUES('[, 5)');
CREATE TABLE textrange_test (tr text);
CREATE INDEX textrange_test_btree on textrange_test(tr);
CREATE TABLE test_range_gist(ir int4range);
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
DROP INDEX test_range_gist_idx;
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
CREATE TABLE test_range_spgist(ir int4range);
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
DROP INDEX test_range_spgist_idx;
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
CREATE TABLE test_range_elem(i int4);
CREATE INDEX test_range_elem_idx on test_range_elem (i);
CREATE INDEX ON test_range_elem using spgist(int4range(i,i+10));
DROP TABLE test_range_elem;
CREATE TABLE test_range_excl(room int4range, speaker int4range, during tsrange, exclude using gist (room with =, during with &&), exclude using gist (speaker with =, during with &&));
CREATE TABLE f_test(f text, i int);
CREATE TABLE i8r_array (f1 int, f2 text);
CREATE TYPE arrayrange as range (subtype=int4[]);
CREATE TYPE two_ints as (a int, b int);
DROP TYPE two_ints cascade;
CREATE TABLE text_support_test (t text);
CREATE TABLE TEMP_FLOAT (f1 FLOAT8);
CREATE TABLE TEMP_INT4 (f1 INT4);
CREATE TABLE TEMP_INT2 (f1 INT2);
CREATE TABLE TEMP_GROUP (f1 INT4, f2 INT4, f3 FLOAT8);
CREATE TABLE POLYGON_TBL(f1 polygon);
CREATE TABLE quad_poly_tbl (id int, p polygon);
INSERT INTO quad_poly_tbl SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10)) FROM generate_series(1, 200) x, generate_series(1, 100) y;
CREATE TABLE quad_poly_tbl_ord_seq2 AS SELECT 1 FROM quad_poly_tbl;
CREATE TABLE quad_poly_tbl_ord_idx2 AS SELECT 1 FROM quad_poly_tbl;
"""
with endpoint.cursor() as cur:
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.endpoints.create_start(branch_name="main", lsn=lsn)
log.info(f"lsn: {lsn}")
for line in sql.split("\n"):
if len(line.strip()) == 0 or line.startswith("--"):
continue
cur.execute(line)
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.endpoints.create_start(branch_name="main", lsn=lsn)
log.info(f"lsn: {lsn}")
cur.execute("VACUUM FULL pg_class;")
for ep in env.endpoints.endpoints:
log.info(f"{ep.endpoint_id} / {ep.pg_port}")
pg_dump_command = ["pg_dumpall", "-f", f"/tmp/dump-{ep.endpoint_id}.sql"]
env_vars = {
"PGPORT": str(ep.pg_port),
"PGUSER": endpoint.default_options["user"],
"PGHOST": endpoint.default_options["host"],
}
pg_bin.run_capture(pg_dump_command, env=env_vars)

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"e5374b72997b0afc8374137674e873f7a558120a"
"8be779fd3ab9e87206da96a7e4842ef1abf04f44"
],
"v16": [
"16.9",
"15710a76b7d07912110fcbbaf0c8ad6d7e5a9fbc"
"0bf96bd6d70301a0b43b0b3457bb3cf8fb43c198"
],
"v15": [
"15.13",
"daa81cffcf063c54b29a9aabdb6604625f675ad0"
"de7640f55da07512834d5cc40c4b3fb376b5f04f"
],
"v14": [
"14.18",
"4cca6f8083483dda9e12eae292cf788d45bd561f"
"55c0d45abe6467c02084c2192bca117eda6ce1e7"
]
}