mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-03 10:40:37 +00:00
Compare commits
14 Commits
conrad/ref
...
conrad/twe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
13d41b51a2 | ||
|
|
f3c9d0adf4 | ||
|
|
2e3dc9a8c2 | ||
|
|
568779fa8a | ||
|
|
e94acbc816 | ||
|
|
f4150614d0 | ||
|
|
38dbc5f67f | ||
|
|
3685ad606d | ||
|
|
76a7d37f7e | ||
|
|
cdb6479c8a | ||
|
|
81c557d87e | ||
|
|
e963129678 | ||
|
|
4f0a9fc569 | ||
|
|
81c6a5a796 |
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -4286,6 +4286,7 @@ dependencies = [
|
||||
"enumset",
|
||||
"fail",
|
||||
"futures",
|
||||
"hashlink",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"http-utils",
|
||||
@@ -4434,6 +4435,16 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"prost 0.13.3",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "papaya"
|
||||
version = "0.2.1"
|
||||
@@ -5205,7 +5216,6 @@ dependencies = [
|
||||
"rustls 0.23.18",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"ryu",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -9,6 +9,7 @@ members = [
|
||||
"pageserver/ctl",
|
||||
"pageserver/client",
|
||||
"pageserver/pagebench",
|
||||
"pageserver/page_api",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"safekeeper/client",
|
||||
@@ -252,6 +253,7 @@ pageserver = { path = "./pageserver" }
|
||||
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
|
||||
pageserver_client = { path = "./pageserver/client" }
|
||||
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
|
||||
pageserver_page_api = { path = "./pageserver/page_api" }
|
||||
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
|
||||
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
|
||||
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
|
||||
|
||||
@@ -7,7 +7,7 @@ index 255e616..1c6edb7 100644
|
||||
RelationGetRelationName(index));
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_start_unlogged_build(index->rd_smgr);
|
||||
+ smgr_start_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
initRumState(&buildstate.rumstate, index);
|
||||
@@ -18,7 +18,7 @@ index 255e616..1c6edb7 100644
|
||||
rumUpdateStats(index, &buildstate.buildStats, buildstate.rumstate.isBuild);
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_finish_unlogged_build_phase_1(index->rd_smgr);
|
||||
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
/*
|
||||
@@ -29,7 +29,7 @@ index 255e616..1c6edb7 100644
|
||||
}
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_end_unlogged_build(index->rd_smgr);
|
||||
+ smgr_end_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
/*
|
||||
|
||||
@@ -213,8 +213,10 @@ impl Escaping for PgIdent {
|
||||
|
||||
// Find the first suitable tag that is not present in the string.
|
||||
// Postgres' max role/DB name length is 63 bytes, so even in the
|
||||
// worst case it won't take long.
|
||||
while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) {
|
||||
// worst case it won't take long. Outer tag is always `tag + "x"`,
|
||||
// so if `tag` is not present in the string, `outer_tag` is not
|
||||
// present in the string either.
|
||||
while self.contains(&tag.to_string()) {
|
||||
tag += "x";
|
||||
outer_tag = tag.clone() + "x";
|
||||
}
|
||||
|
||||
@@ -71,6 +71,14 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor
|
||||
("name$$$", ("$x$name$$$$x$", "xx")),
|
||||
("name$$$$", ("$x$name$$$$$x$", "xx")),
|
||||
("name$x$", ("$xx$name$x$$xx$", "xxx")),
|
||||
("x", ("$xx$x$xx$", "xxx")),
|
||||
("xx", ("$xxx$xx$xxx$", "xxxx")),
|
||||
("$x", ("$xx$$x$xx$", "xxx")),
|
||||
("x$", ("$xx$x$$xx$", "xxx")),
|
||||
("$x$", ("$xx$$x$$xx$", "xxx")),
|
||||
("xx$", ("$xxx$xx$$xxx$", "xxxx")),
|
||||
("$xx", ("$xxx$$xx$xxx$", "xxxx")),
|
||||
("$xx$", ("$xxx$$xx$$xxx$", "xxxx")),
|
||||
];
|
||||
|
||||
for (input, expected) in test_cases {
|
||||
|
||||
@@ -546,6 +546,11 @@ impl PageServerNode {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Falied to parse 'sampling_ratio'")?,
|
||||
relsize_snapshot_cache_capacity: settings
|
||||
.remove("relsize snapshot cache capacity")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
@@ -462,6 +462,8 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
if var(REAL_S3_ENV).is_ok() {
|
||||
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
assert!(body.contains("process_threads"));
|
||||
}
|
||||
|
||||
|
||||
@@ -235,7 +235,7 @@ pub enum PageServiceProtocolPipelinedBatchingStrategy {
|
||||
ScatteredLsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
pub enum GetVectoredConcurrentIo {
|
||||
/// The read path is fully sequential: layers are visited
|
||||
@@ -491,6 +491,8 @@ pub struct TenantConfigToml {
|
||||
/// Tenant level performance sampling ratio override. Controls the ratio of get page requests
|
||||
/// that will get perf sampling for the tenant.
|
||||
pub sampling_ratio: Option<Ratio>,
|
||||
/// Capacity of relsize snapshot cache (used by replicas).
|
||||
pub relsize_snapshot_cache_capacity: usize,
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
@@ -730,6 +732,7 @@ pub mod tenant_conf_defaults {
|
||||
pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true;
|
||||
pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB
|
||||
pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100;
|
||||
pub const DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY: usize = 1000;
|
||||
}
|
||||
|
||||
impl Default for TenantConfigToml {
|
||||
@@ -787,6 +790,7 @@ impl Default for TenantConfigToml {
|
||||
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
|
||||
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
|
||||
sampling_ratio: None,
|
||||
relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -630,6 +630,8 @@ pub struct TenantConfigPatch {
|
||||
pub gc_compaction_ratio_percent: FieldPatch<u64>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub sampling_ratio: FieldPatch<Option<Ratio>>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub relsize_snapshot_cache_capacity: FieldPatch<usize>,
|
||||
}
|
||||
|
||||
/// Like [`crate::config::TenantConfigToml`], but preserves the information
|
||||
@@ -759,6 +761,9 @@ pub struct TenantConfig {
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub sampling_ratio: Option<Option<Ratio>>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub relsize_snapshot_cache_capacity: Option<usize>,
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
@@ -804,6 +809,7 @@ impl TenantConfig {
|
||||
mut gc_compaction_initial_threshold_kb,
|
||||
mut gc_compaction_ratio_percent,
|
||||
mut sampling_ratio,
|
||||
mut relsize_snapshot_cache_capacity,
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
@@ -905,6 +911,9 @@ impl TenantConfig {
|
||||
.gc_compaction_ratio_percent
|
||||
.apply(&mut gc_compaction_ratio_percent);
|
||||
patch.sampling_ratio.apply(&mut sampling_ratio);
|
||||
patch
|
||||
.relsize_snapshot_cache_capacity
|
||||
.apply(&mut relsize_snapshot_cache_capacity);
|
||||
|
||||
Ok(Self {
|
||||
checkpoint_distance,
|
||||
@@ -944,6 +953,7 @@ impl TenantConfig {
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
sampling_ratio,
|
||||
relsize_snapshot_cache_capacity,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1052,6 +1062,9 @@ impl TenantConfig {
|
||||
.gc_compaction_ratio_percent
|
||||
.unwrap_or(global_conf.gc_compaction_ratio_percent),
|
||||
sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio),
|
||||
relsize_snapshot_cache_capacity: self
|
||||
.relsize_snapshot_cache_capacity
|
||||
.unwrap_or(global_conf.relsize_snapshot_cache_capacity),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,12 +9,14 @@ use sha2::digest::FixedOutput;
|
||||
use sha2::{Digest, Sha256};
|
||||
use tokio::task::yield_now;
|
||||
|
||||
use crate::CSafeStr;
|
||||
|
||||
const NONCE_LENGTH: usize = 24;
|
||||
|
||||
/// The identifier of the SCRAM-SHA-256 SASL authentication mechanism.
|
||||
pub const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
|
||||
pub const SCRAM_SHA_256: &CSafeStr = CSafeStr::from_cstr(c"SCRAM-SHA-256");
|
||||
/// The identifier of the SCRAM-SHA-256-PLUS SASL authentication mechanism.
|
||||
pub const SCRAM_SHA_256_PLUS: &str = "SCRAM-SHA-256-PLUS";
|
||||
pub const SCRAM_SHA_256_PLUS: &CSafeStr = CSafeStr::from_cstr(c"SCRAM-SHA-256-PLUS");
|
||||
|
||||
// since postgres passwords are not required to exclude saslprep-prohibited
|
||||
// characters or even be valid UTF8, we run saslprep if possible and otherwise
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
//! set to `UTF8`. It will most likely not behave properly if that is not the case.
|
||||
#![warn(missing_docs, clippy::all)]
|
||||
|
||||
use std::io;
|
||||
use std::{ffi::CStr, io};
|
||||
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
@@ -36,20 +36,67 @@ pub enum IsNull {
|
||||
No,
|
||||
}
|
||||
|
||||
fn write_nullable<F, E>(serializer: F, buf: &mut BytesMut) -> Result<(), E>
|
||||
/// A [`std::ffi::CStr`] but without the null byte.
|
||||
#[repr(transparent)]
|
||||
#[derive(PartialEq, Eq, Hash, Debug)]
|
||||
pub struct CSafeStr([u8]);
|
||||
|
||||
impl CSafeStr {
|
||||
/// Create a new `CSafeStr`, erroring if the bytes contains a null.
|
||||
pub fn new(bytes: &[u8]) -> Result<&Self, io::Error> {
|
||||
let nul_pos = memchr::memchr(0, bytes);
|
||||
match nul_pos {
|
||||
Some(nul_pos) => Err(io::Error::other(format!(
|
||||
"unexpected null byte at position {nul_pos}"
|
||||
))),
|
||||
None => {
|
||||
// Safety: CSafeStr is transparent over [u8].
|
||||
Ok(unsafe { std::mem::transmute(bytes) })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `CSafeStr` up until the next null.
|
||||
pub fn take<'a>(bytes: &mut &'a [u8]) -> &'a Self {
|
||||
let nul_pos = memchr::memchr(0, bytes).unwrap_or(bytes.len());
|
||||
let bytes = bytes
|
||||
.split_off(..nul_pos)
|
||||
.expect("nul_pos should be in-bounds");
|
||||
// Safety: CSafeStr is transparent over [u8].
|
||||
unsafe { std::mem::transmute(bytes) }
|
||||
}
|
||||
|
||||
/// Get the bytes of this CSafeStr.
|
||||
pub const fn as_bytes(&self) -> &[u8] {
|
||||
&self.0
|
||||
}
|
||||
|
||||
/// Create a new `CSafeStr`
|
||||
pub const fn from_cstr(s: &CStr) -> &CSafeStr {
|
||||
// Safety: CSafeStr is transparent over [u8].
|
||||
unsafe { std::mem::transmute(s.to_bytes()) }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a CStr> for &'a CSafeStr {
|
||||
fn from(s: &'a CStr) -> &'a CSafeStr {
|
||||
CSafeStr::from_cstr(s)
|
||||
}
|
||||
}
|
||||
|
||||
fn write_nullable<F>(serializer: F, buf: &mut BytesMut)
|
||||
where
|
||||
F: FnOnce(&mut BytesMut) -> Result<IsNull, E>,
|
||||
E: From<io::Error>,
|
||||
F: FnOnce(&mut BytesMut) -> IsNull,
|
||||
{
|
||||
let base = buf.len();
|
||||
buf.put_i32(0);
|
||||
let size = match serializer(buf)? {
|
||||
IsNull::No => i32::from_usize(buf.len() - base - 4)?,
|
||||
let size = match serializer(buf) {
|
||||
// this is an unreasonable enough case that I think a panic is acceptable.
|
||||
IsNull::No => i32::from_usize(buf.len() - base - 4)
|
||||
.expect("buffer size should not be larger than i32::MAX"),
|
||||
IsNull::Yes => -1,
|
||||
};
|
||||
BigEndian::write_i32(&mut buf[base..], size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
trait FromUsize: Sized {
|
||||
|
||||
@@ -9,7 +9,7 @@ use bytes::{Bytes, BytesMut};
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use memchr::memchr;
|
||||
|
||||
use crate::Oid;
|
||||
use crate::{CSafeStr, Oid};
|
||||
|
||||
// top-level message tags
|
||||
const PARSE_COMPLETE_TAG: u8 = b'1';
|
||||
@@ -332,25 +332,24 @@ impl AuthenticationSaslBody {
|
||||
pub struct SaslMechanisms<'a>(&'a [u8]);
|
||||
|
||||
impl<'a> FallibleIterator for SaslMechanisms<'a> {
|
||||
type Item = &'a str;
|
||||
type Item = &'a CSafeStr;
|
||||
type Error = io::Error;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> io::Result<Option<&'a str>> {
|
||||
let value_end = find_null(self.0, 0)?;
|
||||
if value_end == 0 {
|
||||
if self.0.len() != 1 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"invalid message length: expected to be at end of iterator for sasl",
|
||||
));
|
||||
}
|
||||
Ok(None)
|
||||
} else {
|
||||
let value = get_str(&self.0[..value_end])?;
|
||||
self.0 = &self.0[value_end + 1..];
|
||||
Ok(Some(value))
|
||||
fn next(&mut self) -> io::Result<Option<&'a CSafeStr>> {
|
||||
if self.0 == b"0" {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let value = CSafeStr::take(&mut self.0);
|
||||
if value.as_bytes().len() == 0 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"invalid message length: expected to be at end of iterator for sasl",
|
||||
));
|
||||
}
|
||||
|
||||
Ok(Some(value))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,114 +1,73 @@
|
||||
//! Frontend message serialization.
|
||||
#![allow(missing_docs)]
|
||||
|
||||
use std::error::Error;
|
||||
use std::{io, marker};
|
||||
use std::io;
|
||||
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
|
||||
use crate::{FromUsize, IsNull, Oid, write_nullable};
|
||||
use crate::{CSafeStr, FromUsize, IsNull, Oid, write_nullable};
|
||||
|
||||
#[inline]
|
||||
fn write_body<F, E>(buf: &mut BytesMut, f: F) -> Result<(), E>
|
||||
fn write_body<F>(buf: &mut BytesMut, f: F)
|
||||
where
|
||||
F: FnOnce(&mut BytesMut) -> Result<(), E>,
|
||||
E: From<io::Error>,
|
||||
F: FnOnce(&mut BytesMut),
|
||||
{
|
||||
let base = buf.len();
|
||||
buf.extend_from_slice(&[0; 4]);
|
||||
|
||||
f(buf)?;
|
||||
f(buf);
|
||||
|
||||
let size = i32::from_usize(buf.len() - base)?;
|
||||
let size =
|
||||
i32::from_usize(buf.len() - base).expect("buffer size should not be larger than i32::MAX");
|
||||
BigEndian::write_i32(&mut buf[base..], size);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub enum BindError {
|
||||
Conversion(Box<dyn Error + marker::Sync + Send>),
|
||||
Serialization(io::Error),
|
||||
}
|
||||
|
||||
impl From<Box<dyn Error + marker::Sync + Send>> for BindError {
|
||||
#[inline]
|
||||
fn from(e: Box<dyn Error + marker::Sync + Send>) -> BindError {
|
||||
BindError::Conversion(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<io::Error> for BindError {
|
||||
#[inline]
|
||||
fn from(e: io::Error) -> BindError {
|
||||
BindError::Serialization(e)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn bind<I, J, F, T, K>(
|
||||
portal: &str,
|
||||
statement: &str,
|
||||
portal: &CSafeStr,
|
||||
statement: &CSafeStr,
|
||||
formats: I,
|
||||
values: J,
|
||||
mut serializer: F,
|
||||
result_formats: K,
|
||||
buf: &mut BytesMut,
|
||||
) -> Result<(), BindError>
|
||||
where
|
||||
) where
|
||||
I: IntoIterator<Item = i16>,
|
||||
J: IntoIterator<Item = T>,
|
||||
F: FnMut(T, &mut BytesMut) -> Result<IsNull, Box<dyn Error + marker::Sync + Send>>,
|
||||
F: FnMut(T, &mut BytesMut) -> IsNull,
|
||||
K: IntoIterator<Item = i16>,
|
||||
{
|
||||
buf.put_u8(b'B');
|
||||
|
||||
write_body(buf, |buf| {
|
||||
write_cstr(portal.as_bytes(), buf)?;
|
||||
write_cstr(statement.as_bytes(), buf)?;
|
||||
write_counted(
|
||||
formats,
|
||||
|f, buf| {
|
||||
buf.put_i16(f);
|
||||
Ok::<_, io::Error>(())
|
||||
},
|
||||
buf,
|
||||
)?;
|
||||
write_cstr(portal, buf);
|
||||
write_cstr(statement, buf);
|
||||
write_counted(formats, |f, buf| buf.put_i16(f), buf);
|
||||
write_counted(
|
||||
values,
|
||||
|v, buf| write_nullable(|buf| serializer(v, buf), buf),
|
||||
buf,
|
||||
)?;
|
||||
write_counted(
|
||||
result_formats,
|
||||
|f, buf| {
|
||||
buf.put_i16(f);
|
||||
Ok::<_, io::Error>(())
|
||||
},
|
||||
buf,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
);
|
||||
write_counted(result_formats, |f, buf| buf.put_i16(f), buf);
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_counted<I, T, F, E>(items: I, mut serializer: F, buf: &mut BytesMut) -> Result<(), E>
|
||||
fn write_counted<I, T, F>(items: I, mut serializer: F, buf: &mut BytesMut)
|
||||
where
|
||||
I: IntoIterator<Item = T>,
|
||||
F: FnMut(T, &mut BytesMut) -> Result<(), E>,
|
||||
E: From<io::Error>,
|
||||
F: FnMut(T, &mut BytesMut),
|
||||
{
|
||||
let base = buf.len();
|
||||
buf.extend_from_slice(&[0; 2]);
|
||||
let mut count = 0;
|
||||
for item in items {
|
||||
serializer(item, buf)?;
|
||||
serializer(item, buf);
|
||||
count += 1;
|
||||
}
|
||||
let count = i16::from_usize(count)?;
|
||||
let count = i16::from_usize(count).expect("list should not exceed 32767 items");
|
||||
BigEndian::write_i16(&mut buf[base..], count);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -117,17 +76,15 @@ pub fn cancel_request(process_id: i32, secret_key: i32, buf: &mut BytesMut) {
|
||||
buf.put_i32(80_877_102);
|
||||
buf.put_i32(process_id);
|
||||
buf.put_i32(secret_key);
|
||||
Ok::<_, io::Error>(())
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn close(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
|
||||
pub fn close(variant: u8, name: &CSafeStr, buf: &mut BytesMut) {
|
||||
buf.put_u8(b'C');
|
||||
write_body(buf, |buf| {
|
||||
buf.put_u8(variant);
|
||||
write_cstr(name.as_bytes(), buf)
|
||||
write_cstr(name, buf)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -162,85 +119,75 @@ where
|
||||
#[inline]
|
||||
pub fn copy_done(buf: &mut BytesMut) {
|
||||
buf.put_u8(b'c');
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
write_body(buf, |_| {});
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn copy_fail(message: &str, buf: &mut BytesMut) -> io::Result<()> {
|
||||
pub fn copy_fail(message: &CSafeStr, buf: &mut BytesMut) {
|
||||
buf.put_u8(b'f');
|
||||
write_body(buf, |buf| write_cstr(message.as_bytes(), buf))
|
||||
write_body(buf, |buf| write_cstr(message, buf))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn describe(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
|
||||
pub fn describe(variant: u8, name: &CSafeStr, buf: &mut BytesMut) {
|
||||
buf.put_u8(b'D');
|
||||
write_body(buf, |buf| {
|
||||
buf.put_u8(variant);
|
||||
write_cstr(name.as_bytes(), buf)
|
||||
write_cstr(name, buf)
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn execute(portal: &str, max_rows: i32, buf: &mut BytesMut) -> io::Result<()> {
|
||||
pub fn execute(portal: &CSafeStr, max_rows: i32, buf: &mut BytesMut) {
|
||||
buf.put_u8(b'E');
|
||||
write_body(buf, |buf| {
|
||||
write_cstr(portal.as_bytes(), buf)?;
|
||||
write_cstr(portal, buf);
|
||||
buf.put_i32(max_rows);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn parse<I>(name: &str, query: &str, param_types: I, buf: &mut BytesMut) -> io::Result<()>
|
||||
pub fn parse<I>(name: &CSafeStr, query: &CSafeStr, param_types: I, buf: &mut BytesMut)
|
||||
where
|
||||
I: IntoIterator<Item = Oid>,
|
||||
{
|
||||
buf.put_u8(b'P');
|
||||
write_body(buf, |buf| {
|
||||
write_cstr(name.as_bytes(), buf)?;
|
||||
write_cstr(query.as_bytes(), buf)?;
|
||||
write_counted(
|
||||
param_types,
|
||||
|t, buf| {
|
||||
buf.put_u32(t);
|
||||
Ok::<_, io::Error>(())
|
||||
},
|
||||
buf,
|
||||
)?;
|
||||
Ok(())
|
||||
write_cstr(name, buf);
|
||||
write_cstr(query, buf);
|
||||
write_counted(param_types, |t, buf| buf.put_u32(t), buf);
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn password_message(password: &[u8], buf: &mut BytesMut) -> io::Result<()> {
|
||||
pub fn password_message(password: &CSafeStr, buf: &mut BytesMut) {
|
||||
buf.put_u8(b'p');
|
||||
write_body(buf, |buf| write_cstr(password, buf))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn query(query: &str, buf: &mut BytesMut) -> io::Result<()> {
|
||||
pub fn query(query: &CSafeStr, buf: &mut BytesMut) {
|
||||
buf.put_u8(b'Q');
|
||||
write_body(buf, |buf| write_cstr(query.as_bytes(), buf))
|
||||
write_body(buf, |buf| write_cstr(query, buf))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn sasl_initial_response(mechanism: &str, data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
|
||||
pub fn sasl_initial_response(mechanism: &CSafeStr, data: &[u8], buf: &mut BytesMut) {
|
||||
buf.put_u8(b'p');
|
||||
write_body(buf, |buf| {
|
||||
write_cstr(mechanism.as_bytes(), buf)?;
|
||||
let len = i32::from_usize(data.len())?;
|
||||
write_cstr(mechanism, buf);
|
||||
let len =
|
||||
i32::from_usize(data.len()).expect("sasl data should not be larger than i32::MAX");
|
||||
buf.put_i32(len);
|
||||
buf.put_slice(data);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn sasl_response(data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
|
||||
pub fn sasl_response(data: &[u8], buf: &mut BytesMut) {
|
||||
buf.put_u8(b'p');
|
||||
write_body(buf, |buf| {
|
||||
buf.put_slice(data);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -248,19 +195,16 @@ pub fn sasl_response(data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
|
||||
pub fn ssl_request(buf: &mut BytesMut) {
|
||||
write_body(buf, |buf| {
|
||||
buf.put_i32(80_877_103);
|
||||
Ok::<_, io::Error>(())
|
||||
})
|
||||
.unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn startup_message(parameters: &StartupMessageParams, buf: &mut BytesMut) -> io::Result<()> {
|
||||
pub fn startup_message(parameters: &StartupMessageParams, buf: &mut BytesMut) {
|
||||
write_body(buf, |buf| {
|
||||
// postgres protocol version 3.0(196608) in bigger-endian
|
||||
buf.put_i32(0x00_03_00_00);
|
||||
buf.put_slice(¶meters.params);
|
||||
buf.put_u8(0);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -271,10 +215,7 @@ pub struct StartupMessageParams {
|
||||
|
||||
impl StartupMessageParams {
|
||||
/// Set parameter's value by its name.
|
||||
pub fn insert(&mut self, name: &str, value: &str) {
|
||||
if name.contains('\0') || value.contains('\0') {
|
||||
panic!("startup parameter name or value contained a null")
|
||||
}
|
||||
pub fn insert(&mut self, name: &CSafeStr, value: &CSafeStr) {
|
||||
self.params.put_slice(name.as_bytes());
|
||||
self.params.put_u8(0);
|
||||
self.params.put_slice(value.as_bytes());
|
||||
@@ -285,24 +226,17 @@ impl StartupMessageParams {
|
||||
#[inline]
|
||||
pub fn sync(buf: &mut BytesMut) {
|
||||
buf.put_u8(b'S');
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
write_body(buf, |_| {});
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn terminate(buf: &mut BytesMut) {
|
||||
buf.put_u8(b'X');
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
write_body(buf, |_| {});
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_cstr(s: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
|
||||
if s.contains(&0) {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"string contains embedded null",
|
||||
));
|
||||
}
|
||||
buf.put_slice(s);
|
||||
fn write_cstr(s: &CSafeStr, buf: &mut BytesMut) {
|
||||
buf.put_slice(s.as_bytes());
|
||||
buf.put_u8(0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use bytes::BytesMut;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
#[doc(inline)]
|
||||
pub use postgres_protocol2::Oid;
|
||||
use postgres_protocol2::types;
|
||||
use postgres_protocol2::{IsNull, types};
|
||||
|
||||
use crate::type_gen::{Inner, Other};
|
||||
|
||||
@@ -32,36 +32,15 @@ macro_rules! accepts {
|
||||
/// All `ToSql` implementations should use this macro.
|
||||
macro_rules! to_sql_checked {
|
||||
() => {
|
||||
fn to_sql_checked(
|
||||
&self,
|
||||
ty: &$crate::Type,
|
||||
out: &mut $crate::private::BytesMut,
|
||||
) -> ::std::result::Result<
|
||||
$crate::IsNull,
|
||||
Box<dyn ::std::error::Error + ::std::marker::Sync + ::std::marker::Send>,
|
||||
> {
|
||||
$crate::__to_sql_checked(self, ty, out)
|
||||
fn check(&self, ty: &Type) -> ::std::result::Result<(), $crate::WrongType> {
|
||||
if !<Self as $crate::ToSql>::accepts(ty) {
|
||||
return Err($crate::WrongType::new::<Self>(ty.clone()));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// WARNING: this function is not considered part of this crate's public API.
|
||||
// It is subject to change at any time.
|
||||
#[doc(hidden)]
|
||||
pub fn __to_sql_checked<T>(
|
||||
v: &T,
|
||||
ty: &Type,
|
||||
out: &mut BytesMut,
|
||||
) -> Result<IsNull, Box<dyn Error + Sync + Send>>
|
||||
where
|
||||
T: ToSql,
|
||||
{
|
||||
if !T::accepts(ty) {
|
||||
return Err(Box::new(WrongType::new::<T>(ty.clone())));
|
||||
}
|
||||
v.to_sql(ty, out)
|
||||
}
|
||||
|
||||
// mod pg_lsn;
|
||||
#[doc(hidden)]
|
||||
pub mod private;
|
||||
@@ -369,14 +348,6 @@ macro_rules! simple_from {
|
||||
simple_from!(i8, char_from_sql, CHAR);
|
||||
simple_from!(u32, oid_from_sql, OID);
|
||||
|
||||
/// An enum representing the nullability of a Postgres value.
|
||||
pub enum IsNull {
|
||||
/// The value is NULL.
|
||||
Yes,
|
||||
/// The value is not NULL.
|
||||
No,
|
||||
}
|
||||
|
||||
/// A trait for types that can be converted into Postgres values.
|
||||
pub trait ToSql: fmt::Debug {
|
||||
/// Converts the value of `self` into the binary format of the specified
|
||||
@@ -388,9 +359,7 @@ pub trait ToSql: fmt::Debug {
|
||||
/// The return value indicates if this value should be represented as
|
||||
/// `NULL`. If this is the case, implementations **must not** write
|
||||
/// anything to `out`.
|
||||
fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>>
|
||||
where
|
||||
Self: Sized;
|
||||
fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> IsNull;
|
||||
|
||||
/// Determines if a value of this type can be converted to the specified
|
||||
/// Postgres `Type`.
|
||||
@@ -402,11 +371,7 @@ pub trait ToSql: fmt::Debug {
|
||||
///
|
||||
/// *All* implementations of this method should be generated by the
|
||||
/// `to_sql_checked!()` macro.
|
||||
fn to_sql_checked(
|
||||
&self,
|
||||
ty: &Type,
|
||||
out: &mut BytesMut,
|
||||
) -> Result<IsNull, Box<dyn Error + Sync + Send>>;
|
||||
fn check(&self, ty: &Type) -> Result<(), WrongType>;
|
||||
|
||||
/// Specify the encode format
|
||||
fn encode_format(&self, _ty: &Type) -> Format {
|
||||
@@ -426,14 +391,14 @@ pub enum Format {
|
||||
}
|
||||
|
||||
impl ToSql for &str {
|
||||
fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
|
||||
fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> IsNull {
|
||||
match *ty {
|
||||
ref ty if ty.name() == "ltree" => types::ltree_to_sql(self, w),
|
||||
ref ty if ty.name() == "lquery" => types::lquery_to_sql(self, w),
|
||||
ref ty if ty.name() == "ltxtquery" => types::ltxtquery_to_sql(self, w),
|
||||
_ => types::text_to_sql(self, w),
|
||||
}
|
||||
Ok(IsNull::No)
|
||||
IsNull::No
|
||||
}
|
||||
|
||||
fn accepts(ty: &Type) -> bool {
|
||||
@@ -457,12 +422,9 @@ impl ToSql for &str {
|
||||
macro_rules! simple_to {
|
||||
($t:ty, $f:ident, $($expected:ident),+) => {
|
||||
impl ToSql for $t {
|
||||
fn to_sql(&self,
|
||||
_: &Type,
|
||||
w: &mut BytesMut)
|
||||
-> Result<IsNull, Box<dyn Error + Sync + Send>> {
|
||||
fn to_sql(&self, _: &Type, w: &mut BytesMut) -> IsNull {
|
||||
types::$f(*self, w);
|
||||
Ok(IsNull::No)
|
||||
IsNull::No
|
||||
}
|
||||
|
||||
accepts!($($expected),+);
|
||||
|
||||
@@ -219,7 +219,7 @@ impl Client {
|
||||
}
|
||||
|
||||
let buf = self.client.inner().with_buf(|buf| {
|
||||
frontend::query("ROLLBACK", buf).unwrap();
|
||||
frontend::query(c"ROLLBACK".into(), buf);
|
||||
buf.split().freeze()
|
||||
});
|
||||
let _ = self
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::net::IpAddr;
|
||||
use std::time::Duration;
|
||||
use std::{fmt, str};
|
||||
|
||||
use postgres_protocol2::CSafeStr;
|
||||
pub use postgres_protocol2::authentication::sasl::ScramKeys;
|
||||
use postgres_protocol2::message::frontend::StartupMessageParams;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -162,7 +163,10 @@ impl Config {
|
||||
self.username = true;
|
||||
}
|
||||
|
||||
self.server_params.insert(name, value);
|
||||
self.server_params.insert(
|
||||
CSafeStr::new(name.as_bytes()).expect("param name should not contain a null"),
|
||||
CSafeStr::new(value.as_bytes()).expect("param name should not contain a null"),
|
||||
);
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::task::{Context, Poll};
|
||||
use bytes::BytesMut;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{Sink, SinkExt, Stream, TryStreamExt, ready};
|
||||
use postgres_protocol2::CSafeStr;
|
||||
use postgres_protocol2::authentication::sasl;
|
||||
use postgres_protocol2::authentication::sasl::ScramSha256;
|
||||
use postgres_protocol2::message::backend::{AuthenticationSaslBody, Message, NoticeResponseBody};
|
||||
@@ -122,7 +123,7 @@ where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::startup_message(&config.server_params, &mut buf).map_err(Error::encode)?;
|
||||
frontend::startup_message(&config.server_params, &mut buf);
|
||||
|
||||
stream
|
||||
.send(FrontendMessage::Raw(buf.freeze()))
|
||||
@@ -193,7 +194,7 @@ where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::password_message(password, &mut buf).map_err(Error::encode)?;
|
||||
frontend::password_message(CSafeStr::new(password).map_err(Error::encode)?, &mut buf);
|
||||
|
||||
stream
|
||||
.send(FrontendMessage::Raw(buf.freeze()))
|
||||
@@ -214,10 +215,10 @@ where
|
||||
let mut has_scram_plus = false;
|
||||
let mut mechanisms = body.mechanisms();
|
||||
while let Some(mechanism) = mechanisms.next().map_err(Error::parse)? {
|
||||
match mechanism {
|
||||
sasl::SCRAM_SHA_256 => has_scram = true,
|
||||
sasl::SCRAM_SHA_256_PLUS => has_scram_plus = true,
|
||||
_ => {}
|
||||
if mechanism == sasl::SCRAM_SHA_256 {
|
||||
has_scram = true;
|
||||
} else if mechanism == sasl::SCRAM_SHA_256_PLUS {
|
||||
has_scram_plus = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -256,7 +257,7 @@ where
|
||||
};
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::sasl_initial_response(mechanism, scram.message(), &mut buf).map_err(Error::encode)?;
|
||||
frontend::sasl_initial_response(mechanism, scram.message(), &mut buf);
|
||||
stream
|
||||
.send(FrontendMessage::Raw(buf.freeze()))
|
||||
.await
|
||||
@@ -275,7 +276,7 @@ where
|
||||
.map_err(|e| Error::authentication(e.into()))?;
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::sasl_response(scram.message(), &mut buf).map_err(Error::encode)?;
|
||||
frontend::sasl_response(scram.message(), &mut buf);
|
||||
stream
|
||||
.send(FrontendMessage::Raw(buf.freeze()))
|
||||
.await
|
||||
|
||||
@@ -123,6 +123,6 @@ pub enum SimpleQueryMessage {
|
||||
|
||||
fn slice_iter<'a>(
|
||||
s: &'a [&'a (dyn ToSql + Sync)],
|
||||
) -> impl ExactSizeIterator<Item = &'a (dyn ToSql + Sync)> + 'a {
|
||||
) -> impl ExactSizeIterator<Item = &'a (dyn ToSql + Sync)> + Clone + 'a {
|
||||
s.iter().map(|s| *s as _)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::ffi::CStr;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
@@ -5,9 +6,9 @@ use std::sync::Arc;
|
||||
use bytes::Bytes;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{TryStreamExt, pin_mut};
|
||||
use postgres_protocol2::CSafeStr;
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::client::{CachedTypeInfo, InnerClient};
|
||||
use crate::codec::FrontendMessage;
|
||||
@@ -15,7 +16,7 @@ use crate::connection::RequestMessages;
|
||||
use crate::types::{Kind, Oid, Type};
|
||||
use crate::{Column, Error, Statement, query, slice_iter};
|
||||
|
||||
pub(crate) const TYPEINFO_QUERY: &str = "\
|
||||
pub(crate) const TYPEINFO_QUERY: &CStr = c"\
|
||||
SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, t.typbasetype, n.nspname, t.typrelid
|
||||
FROM pg_catalog.pg_type t
|
||||
LEFT OUTER JOIN pg_catalog.pg_range r ON r.rngtypid = t.oid
|
||||
@@ -25,11 +26,11 @@ WHERE t.oid = $1
|
||||
|
||||
async fn prepare_typecheck(
|
||||
client: &Arc<InnerClient>,
|
||||
name: &'static str,
|
||||
query: &str,
|
||||
name: &'static CStr,
|
||||
query: &CSafeStr,
|
||||
types: &[Type],
|
||||
) -> Result<Statement, Error> {
|
||||
let buf = encode(client, name, query, types)?;
|
||||
let buf = encode(client, name.into(), query, types)?;
|
||||
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
|
||||
|
||||
match responses.next().await? {
|
||||
@@ -68,16 +69,21 @@ async fn prepare_typecheck(
|
||||
Ok(Statement::new(client, name, parameters, columns))
|
||||
}
|
||||
|
||||
fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result<Bytes, Error> {
|
||||
if types.is_empty() {
|
||||
debug!("preparing query {}: {}", name, query);
|
||||
} else {
|
||||
debug!("preparing query {} with types {:?}: {}", name, types, query);
|
||||
}
|
||||
fn encode(
|
||||
client: &InnerClient,
|
||||
name: &CSafeStr,
|
||||
query: &CSafeStr,
|
||||
types: &[Type],
|
||||
) -> Result<Bytes, Error> {
|
||||
// if types.is_empty() {
|
||||
// debug!("preparing query {}: {}", name, query);
|
||||
// } else {
|
||||
// debug!("preparing query {} with types {:?}: {}", name, types, query);
|
||||
// }
|
||||
|
||||
client.with_buf(|buf| {
|
||||
frontend::parse(name, query, types.iter().map(Type::oid), buf).map_err(Error::encode)?;
|
||||
frontend::describe(b'S', name, buf).map_err(Error::encode)?;
|
||||
frontend::parse(name, query, types.iter().map(Type::oid), buf);
|
||||
frontend::describe(b'S', name, buf);
|
||||
frontend::sync(buf);
|
||||
Ok(buf.split().freeze())
|
||||
})
|
||||
@@ -154,8 +160,8 @@ async fn typeinfo_statement(
|
||||
return Ok(stmt.clone());
|
||||
}
|
||||
|
||||
let typeinfo = "neon_proxy_typeinfo";
|
||||
let stmt = prepare_typecheck(client, typeinfo, TYPEINFO_QUERY, &[]).await?;
|
||||
let typeinfo = c"neon_proxy_typeinfo";
|
||||
let stmt = prepare_typecheck(client, typeinfo, TYPEINFO_QUERY.into(), &[]).await?;
|
||||
|
||||
typecache.typeinfo = Some(stmt.clone());
|
||||
Ok(stmt)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::fmt;
|
||||
use std::iter;
|
||||
use std::marker::PhantomPinned;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
@@ -8,25 +8,16 @@ use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{Stream, ready};
|
||||
use pin_project_lite::pin_project;
|
||||
use postgres_protocol2::CSafeStr;
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use postgres_types2::{Format, ToSql, Type};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::client::{InnerClient, Responses};
|
||||
use crate::codec::FrontendMessage;
|
||||
use crate::connection::RequestMessages;
|
||||
use crate::types::IsNull;
|
||||
use crate::{Column, Error, ReadyForQueryStatus, Row, Statement};
|
||||
|
||||
struct BorrowToSqlParamsDebug<'a>(&'a [&'a (dyn ToSql + Sync)]);
|
||||
|
||||
impl fmt::Debug for BorrowToSqlParamsDebug<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_list().entries(self.0.iter()).finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn query<'a, I>(
|
||||
client: &InnerClient,
|
||||
statement: Statement,
|
||||
@@ -34,19 +25,9 @@ pub async fn query<'a, I>(
|
||||
) -> Result<RowStream, Error>
|
||||
where
|
||||
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
I::IntoIter: ExactSizeIterator + Clone,
|
||||
{
|
||||
let buf = if tracing::enabled!(tracing::Level::DEBUG) {
|
||||
let params = params.into_iter().collect::<Vec<_>>();
|
||||
debug!(
|
||||
"executing statement {} with parameters: {:?}",
|
||||
statement.name(),
|
||||
BorrowToSqlParamsDebug(params.as_slice()),
|
||||
);
|
||||
encode(client, &statement, params)?
|
||||
} else {
|
||||
encode(client, &statement, params)?
|
||||
};
|
||||
let buf = encode(client, &statement, params)?;
|
||||
let responses = start(client, buf).await?;
|
||||
Ok(RowStream {
|
||||
statement,
|
||||
@@ -68,45 +49,45 @@ where
|
||||
I: IntoIterator<Item = Option<S>>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
let query = CSafeStr::new(query.as_bytes()).map_err(Error::encode)?;
|
||||
let params = params.into_iter();
|
||||
|
||||
let portal = c"".into(); // unnamed portal
|
||||
let statement = c"".into(); // unnamed prepared statement
|
||||
|
||||
let buf = client.with_buf(|buf| {
|
||||
frontend::parse(
|
||||
"", // unnamed prepared statement
|
||||
query, // query to parse
|
||||
std::iter::empty(), // give no type info
|
||||
statement,
|
||||
query, // query to parse
|
||||
iter::empty(), // give no type info
|
||||
buf,
|
||||
)
|
||||
.map_err(Error::encode)?;
|
||||
frontend::describe(b'S', "", buf).map_err(Error::encode)?;
|
||||
// Bind, pass params as text, retrieve as binary
|
||||
match frontend::bind(
|
||||
"", // empty string selects the unnamed portal
|
||||
"", // unnamed prepared statement
|
||||
std::iter::empty(), // all parameters use the default format (text)
|
||||
);
|
||||
frontend::describe(b'S', statement, buf);
|
||||
|
||||
// Bind, pass params as text, retrieve as test
|
||||
frontend::bind(
|
||||
portal,
|
||||
statement,
|
||||
iter::empty(), // all parameters use the default format (text)
|
||||
params,
|
||||
|param, buf| match param {
|
||||
Some(param) => {
|
||||
buf.put_slice(param.as_ref().as_bytes());
|
||||
Ok(postgres_protocol2::IsNull::No)
|
||||
postgres_protocol2::IsNull::No
|
||||
}
|
||||
None => Ok(postgres_protocol2::IsNull::Yes),
|
||||
None => postgres_protocol2::IsNull::Yes,
|
||||
},
|
||||
Some(0), // all text
|
||||
buf,
|
||||
) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
|
||||
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
|
||||
}?;
|
||||
);
|
||||
|
||||
// Execute
|
||||
frontend::execute("", 0, buf).map_err(Error::encode)?;
|
||||
frontend::execute(portal, 0, buf);
|
||||
// Sync
|
||||
frontend::sync(buf);
|
||||
|
||||
Ok(buf.split().freeze())
|
||||
})?;
|
||||
buf.split().freeze()
|
||||
});
|
||||
|
||||
// now read the responses
|
||||
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
|
||||
@@ -173,11 +154,13 @@ async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
|
||||
pub fn encode<'a, I>(client: &InnerClient, statement: &Statement, params: I) -> Result<Bytes, Error>
|
||||
where
|
||||
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
I::IntoIter: ExactSizeIterator + Clone,
|
||||
{
|
||||
let portal = c"".into(); // unnamed portal
|
||||
|
||||
client.with_buf(|buf| {
|
||||
encode_bind(statement, params, "", buf)?;
|
||||
frontend::execute("", 0, buf).map_err(Error::encode)?;
|
||||
encode_bind(statement, params, portal, buf)?;
|
||||
frontend::execute(portal, 0, buf);
|
||||
frontend::sync(buf);
|
||||
Ok(buf.split().freeze())
|
||||
})
|
||||
@@ -186,15 +169,15 @@ where
|
||||
pub fn encode_bind<'a, I>(
|
||||
statement: &Statement,
|
||||
params: I,
|
||||
portal: &str,
|
||||
portal: &CSafeStr,
|
||||
buf: &mut BytesMut,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
I::IntoIter: ExactSizeIterator + Clone,
|
||||
{
|
||||
let param_types = statement.params();
|
||||
let params = params.into_iter();
|
||||
let params = iter::zip(params.into_iter(), param_types);
|
||||
|
||||
assert!(
|
||||
param_types.len() == params.len(),
|
||||
@@ -203,35 +186,24 @@ where
|
||||
params.len()
|
||||
);
|
||||
|
||||
let (param_formats, params): (Vec<_>, Vec<_>) = params
|
||||
.zip(param_types.iter())
|
||||
.map(|(p, ty)| (p.encode_format(ty) as i16, p))
|
||||
.unzip();
|
||||
// check encodings
|
||||
for (i, (p, ty)) in params.clone().enumerate() {
|
||||
p.check(ty).map_err(|e| Error::to_sql(Box::new(e), i))?
|
||||
}
|
||||
|
||||
let params = params.into_iter();
|
||||
let param_formats = params.clone().map(|(p, ty)| p.encode_format(ty) as i16);
|
||||
|
||||
let mut error_idx = 0;
|
||||
let r = frontend::bind(
|
||||
frontend::bind(
|
||||
portal,
|
||||
statement.name(),
|
||||
param_formats,
|
||||
params.zip(param_types).enumerate(),
|
||||
|(idx, (param, ty)), buf| match param.to_sql_checked(ty, buf) {
|
||||
Ok(IsNull::No) => Ok(postgres_protocol2::IsNull::No),
|
||||
Ok(IsNull::Yes) => Ok(postgres_protocol2::IsNull::Yes),
|
||||
Err(e) => {
|
||||
error_idx = idx;
|
||||
Err(e)
|
||||
}
|
||||
},
|
||||
params,
|
||||
|(param, ty), buf| param.to_sql(ty, buf),
|
||||
Some(1),
|
||||
buf,
|
||||
);
|
||||
match r {
|
||||
Ok(()) => Ok(()),
|
||||
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, error_idx)),
|
||||
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
|
||||
@@ -7,6 +7,7 @@ use bytes::Bytes;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures_util::{Stream, ready};
|
||||
use pin_project_lite::pin_project;
|
||||
use postgres_protocol2::CSafeStr;
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use tracing::debug;
|
||||
@@ -69,8 +70,9 @@ pub async fn batch_execute(
|
||||
}
|
||||
|
||||
pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
|
||||
let query = CSafeStr::new(query.as_bytes()).map_err(Error::encode)?;
|
||||
client.with_buf(|buf| {
|
||||
frontend::query(query, buf).map_err(Error::encode)?;
|
||||
frontend::query(query, buf);
|
||||
Ok(buf.split().freeze())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
use std::ffi::CStr;
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use postgres_protocol2::Oid;
|
||||
use postgres_protocol2::message::backend::Field;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use postgres_protocol2::{CSafeStr, Oid};
|
||||
|
||||
use crate::client::InnerClient;
|
||||
use crate::codec::FrontendMessage;
|
||||
@@ -12,7 +13,7 @@ use crate::types::Type;
|
||||
|
||||
struct StatementInner {
|
||||
client: Weak<InnerClient>,
|
||||
name: &'static str,
|
||||
name: &'static CStr,
|
||||
params: Vec<Type>,
|
||||
columns: Vec<Column>,
|
||||
}
|
||||
@@ -21,7 +22,7 @@ impl Drop for StatementInner {
|
||||
fn drop(&mut self) {
|
||||
if let Some(client) = self.client.upgrade() {
|
||||
let buf = client.with_buf(|buf| {
|
||||
frontend::close(b'S', self.name, buf).unwrap();
|
||||
frontend::close(b'S', self.name.into(), buf);
|
||||
frontend::sync(buf);
|
||||
buf.split().freeze()
|
||||
});
|
||||
@@ -39,7 +40,7 @@ pub struct Statement(Arc<StatementInner>);
|
||||
impl Statement {
|
||||
pub(crate) fn new(
|
||||
inner: &Arc<InnerClient>,
|
||||
name: &'static str,
|
||||
name: &'static CStr,
|
||||
params: Vec<Type>,
|
||||
columns: Vec<Column>,
|
||||
) -> Statement {
|
||||
@@ -54,14 +55,14 @@ impl Statement {
|
||||
pub(crate) fn new_anonymous(params: Vec<Type>, columns: Vec<Column>) -> Statement {
|
||||
Statement(Arc::new(StatementInner {
|
||||
client: Weak::new(),
|
||||
name: "<anonymous>",
|
||||
name: c"<anonymous>",
|
||||
params,
|
||||
columns,
|
||||
}))
|
||||
}
|
||||
|
||||
pub(crate) fn name(&self) -> &str {
|
||||
self.0.name
|
||||
pub(crate) fn name(&self) -> &CSafeStr {
|
||||
self.0.name.into()
|
||||
}
|
||||
|
||||
/// Returns the expected types of the statement's parameters.
|
||||
|
||||
@@ -21,7 +21,7 @@ impl Drop for Transaction<'_> {
|
||||
}
|
||||
|
||||
let buf = self.client.inner().with_buf(|buf| {
|
||||
frontend::query("ROLLBACK", buf).unwrap();
|
||||
frontend::query(c"ROLLBACK".into(), buf);
|
||||
buf.split().freeze()
|
||||
});
|
||||
let _ = self
|
||||
|
||||
@@ -30,6 +30,7 @@ crc32c.workspace = true
|
||||
either.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
hashlink.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
|
||||
13
pageserver/page_api/Cargo.toml
Normal file
13
pageserver/page_api/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
prost.workspace = true
|
||||
tonic.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build.workspace = true
|
||||
13
pageserver/page_api/build.rs
Normal file
13
pageserver/page_api/build.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
use std::env;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Generates Rust code from .proto Protobuf schemas, along with a binary file
|
||||
/// descriptor set for Protobuf schema reflection.
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let out_dir = PathBuf::from(env::var("OUT_DIR")?);
|
||||
tonic_build::configure()
|
||||
.bytes(["."])
|
||||
.file_descriptor_set_path(out_dir.join("page_api_descriptor.bin"))
|
||||
.compile_protos(&["proto/page_service.proto"], &["proto"])
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
233
pageserver/page_api/proto/page_service.proto
Normal file
233
pageserver/page_api/proto/page_service.proto
Normal file
@@ -0,0 +1,233 @@
|
||||
// Page service, presented by pageservers for computes.
|
||||
//
|
||||
// This is the compute read path. It primarily serves page versions at given
|
||||
// LSNs, but also base backups, SLRU segments, and relation metadata.
|
||||
//
|
||||
// EXPERIMENTAL: this is still under development and subject to change.
|
||||
//
|
||||
// Request metadata headers:
|
||||
// - authorization: JWT token ("Bearer <token>"), if auth is enabled
|
||||
// - neon-tenant-id: tenant ID ("7c4a1f9e3bd6470c8f3e21a65bd2e980")
|
||||
// - neon-shard-id: shard ID, as <number><count> in hex ("0b10" = shard 11 of 16, 0-based)
|
||||
// - neon-timeline-id: timeline ID ("f08c4e9a2d5f76b1e3a7c2d8910f4b3e")
|
||||
//
|
||||
// The service can be accessed via e.g. grpcurl:
|
||||
//
|
||||
// ```
|
||||
// grpcurl \
|
||||
// -plaintext \
|
||||
// -H "neon-tenant-id: 7c4a1f9e3bd6470c8f3e21a65bd2e980" \
|
||||
// -H "neon-shard-id: 0b10" \
|
||||
// -H "neon-timeline-id: f08c4e9a2d5f76b1e3a7c2d8910f4b3e" \
|
||||
// -H "authorization: Bearer $JWT" \
|
||||
// -d '{"read_lsn": {"request_lsn": 1234567890}, "rel": {"spc_oid": 1663, "db_oid": 1234, "rel_number": 5678, "fork_number": 0}}'
|
||||
// localhost:51051 page_api.PageService/CheckRelExists
|
||||
// ```
|
||||
//
|
||||
// TODO: consider adding neon-compute-mode ("primary", "static", "replica").
|
||||
// However, this will require reconnecting when changing modes.
|
||||
//
|
||||
// TODO: write implementation guidance on
|
||||
// - Health checks
|
||||
// - Tracing, OpenTelemetry
|
||||
// - Compression
|
||||
|
||||
syntax = "proto3";
|
||||
package page_api;
|
||||
|
||||
service PageService {
|
||||
// Returns whether a relation exists.
|
||||
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
|
||||
|
||||
// Fetches a base backup.
|
||||
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
|
||||
|
||||
// Returns the total size of a database, as # of bytes.
|
||||
rpc GetDbSize (GetDbSizeRequest) returns (GetDbSizeResponse);
|
||||
|
||||
// Fetches pages.
|
||||
//
|
||||
// This is implemented as a bidirectional streaming RPC for performance. Unary
|
||||
// requests incur costs for e.g. HTTP/2 stream setup, header parsing,
|
||||
// authentication, and so on -- with streaming, we only pay these costs during
|
||||
// the initial stream setup. This ~doubles throughput in benchmarks. Other
|
||||
// RPCs use regular unary requests, since they are not as frequent and
|
||||
// performance-critical, and this simplifies implementation.
|
||||
//
|
||||
// NB: a status response (e.g. errors) will terminate the stream. The stream
|
||||
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
|
||||
// Most errors are therefore sent as GetPageResponse.status instead.
|
||||
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
|
||||
|
||||
// Returns the size of a relation, as # of blocks.
|
||||
rpc GetRelSize (GetRelSizeRequest) returns (GetRelSizeResponse);
|
||||
|
||||
// Fetches an SLRU segment.
|
||||
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
|
||||
}
|
||||
|
||||
// The LSN a request should read at.
|
||||
message ReadLsn {
|
||||
// The request's read LSN. Required.
|
||||
uint64 request_lsn = 1;
|
||||
// If given, the caller guarantees that the page has not been modified since
|
||||
// this LSN. Must be smaller than or equal to request_lsn. This allows the
|
||||
// Pageserver to serve an old page without waiting for the request LSN to
|
||||
// arrive. Valid for all request types.
|
||||
//
|
||||
// It is undefined behaviour to make a request such that the page was, in
|
||||
// fact, modified between request_lsn and not_modified_since_lsn. The
|
||||
// Pageserver might detect it and return an error, or it might return the old
|
||||
// page version or the new page version. Setting not_modified_since_lsn equal
|
||||
// to request_lsn is always safe, but can lead to unnecessary waiting.
|
||||
uint64 not_modified_since_lsn = 2;
|
||||
}
|
||||
|
||||
// A relation identifier.
|
||||
message RelTag {
|
||||
uint32 spc_oid = 1;
|
||||
uint32 db_oid = 2;
|
||||
uint32 rel_number = 3;
|
||||
uint32 fork_number = 4;
|
||||
}
|
||||
|
||||
// Checks whether a relation exists, at the given LSN. Only valid on shard 0,
|
||||
// other shards will error.
|
||||
message CheckRelExistsRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
}
|
||||
|
||||
message CheckRelExistsResponse {
|
||||
bool exists = 1;
|
||||
}
|
||||
|
||||
// Requests a base backup at a given LSN.
|
||||
message GetBaseBackupRequest {
|
||||
// The LSN to fetch a base backup at.
|
||||
ReadLsn read_lsn = 1;
|
||||
// If true, logical replication slots will not be created.
|
||||
bool replica = 2;
|
||||
}
|
||||
|
||||
// Base backup response chunk, returned as an ordered stream.
|
||||
message GetBaseBackupResponseChunk {
|
||||
// A basebackup data chunk. The size is undefined, but bounded by the 4 MB
|
||||
// gRPC message size limit.
|
||||
bytes chunk = 1;
|
||||
}
|
||||
|
||||
// Requests the size of a database, as # of bytes. Only valid on shard 0, other
|
||||
// shards will error.
|
||||
message GetDbSizeRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
uint32 db_oid = 2;
|
||||
}
|
||||
|
||||
message GetDbSizeResponse {
|
||||
uint64 num_bytes = 1;
|
||||
}
|
||||
|
||||
// Requests one or more pages.
|
||||
message GetPageRequest {
|
||||
// A request ID. Will be included in the response. Should be unique for
|
||||
// in-flight requests on the stream.
|
||||
uint64 request_id = 1;
|
||||
// The request class.
|
||||
GetPageClass request_class = 2;
|
||||
// The LSN to read at.
|
||||
ReadLsn read_lsn = 3;
|
||||
// The relation to read from.
|
||||
RelTag rel = 4;
|
||||
// Page numbers to read. Must belong to the remote shard.
|
||||
//
|
||||
// Multiple pages will be executed as a single batch by the Pageserver,
|
||||
// amortizing layer access costs and parallelizing them. This may increase the
|
||||
// latency of any individual request, but improves the overall latency and
|
||||
// throughput of the batch as a whole.
|
||||
//
|
||||
// TODO: this causes an allocation in the common single-block case. The sender
|
||||
// can use a SmallVec to stack-allocate it, but Prost will always deserialize
|
||||
// into a heap-allocated Vec. Consider optimizing this.
|
||||
//
|
||||
// TODO: we might be able to avoid a sort or something if we mandate that these
|
||||
// are always in order. But we can't currenly rely on this on the server, because
|
||||
// of compatibility with the libpq protocol handler.
|
||||
repeated uint32 block_number = 5;
|
||||
}
|
||||
|
||||
// A GetPageRequest class. Primarily intended for observability, but may also be
|
||||
// used for prioritization in the future.
|
||||
enum GetPageClass {
|
||||
// Unknown class. For forwards compatibility: used when the client sends a
|
||||
// class that the server doesn't know about.
|
||||
GET_PAGE_CLASS_UNKNOWN = 0;
|
||||
// A normal request. This is the default.
|
||||
GET_PAGE_CLASS_NORMAL = 1;
|
||||
// A prefetch request. NB: can only be classified on pg < 18.
|
||||
GET_PAGE_CLASS_PREFETCH = 2;
|
||||
// A background request (e.g. vacuum).
|
||||
GET_PAGE_CLASS_BACKGROUND = 3;
|
||||
}
|
||||
|
||||
// A GetPage response.
|
||||
//
|
||||
// A batch response will contain all of the requested pages. We could eagerly
|
||||
// emit individual pages as soon as they are ready, but on a readv() Postgres
|
||||
// holds buffer pool locks on all pages in the batch and we'll only return once
|
||||
// the entire batch is ready, so no one can make use of the individual pages.
|
||||
message GetPageResponse {
|
||||
// The original request's ID.
|
||||
uint64 request_id = 1;
|
||||
// The response status code.
|
||||
GetPageStatus status = 2;
|
||||
// A string describing the status, if any.
|
||||
string reason = 3;
|
||||
// The 8KB page images, in the same order as the request. Empty if status != OK.
|
||||
repeated bytes page_image = 4;
|
||||
}
|
||||
|
||||
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
|
||||
// want to send errors as gRPC statuses, since this would terminate the stream.
|
||||
enum GetPageStatus {
|
||||
// Unknown status. For forwards compatibility: used when the server sends a
|
||||
// status code that the client doesn't know about.
|
||||
GET_PAGE_STATUS_UNKNOWN = 0;
|
||||
// The request was successful.
|
||||
GET_PAGE_STATUS_OK = 1;
|
||||
// The page did not exist. The tenant/timeline/shard has already been
|
||||
// validated during stream setup.
|
||||
GET_PAGE_STATUS_NOT_FOUND = 2;
|
||||
// The request was invalid.
|
||||
GET_PAGE_STATUS_INVALID = 3;
|
||||
// The tenant is rate limited. Slow down and retry later.
|
||||
GET_PAGE_STATUS_SLOW_DOWN = 4;
|
||||
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
|
||||
// layer download. This could free up the server task to process other
|
||||
// requests while the layer download is in progress.
|
||||
}
|
||||
|
||||
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on
|
||||
// shard 0, other shards will error.
|
||||
message GetRelSizeRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
}
|
||||
|
||||
message GetRelSizeResponse {
|
||||
uint32 num_blocks = 1;
|
||||
}
|
||||
|
||||
// Requests an SLRU segment. Only valid on shard 0, other shards will error.
|
||||
message GetSlruSegmentRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
uint32 kind = 2;
|
||||
uint32 segno = 3;
|
||||
}
|
||||
|
||||
// Returns an SLRU segment.
|
||||
//
|
||||
// These are up 32 pages (256 KB), so we can send them as a single response.
|
||||
message GetSlruSegmentResponse {
|
||||
bytes segment = 1;
|
||||
}
|
||||
19
pageserver/page_api/src/lib.rs
Normal file
19
pageserver/page_api/src/lib.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
//! This crate provides the Pageserver's page API. It contains:
|
||||
//!
|
||||
//! * proto/page_service.proto: the Protobuf schema for the page API.
|
||||
//! * proto: auto-generated Protobuf types for gRPC.
|
||||
//!
|
||||
//! This crate is used by both the client and the server. Try to keep it slim.
|
||||
|
||||
// Code generated by protobuf.
|
||||
pub mod proto {
|
||||
tonic::include_proto!("page_api");
|
||||
|
||||
/// File descriptor set for Protobuf schema reflection. This allows using
|
||||
/// e.g. grpcurl with the API.
|
||||
pub const FILE_DESCRIPTOR_SET: &[u8] =
|
||||
tonic::include_file_descriptor_set!("page_api_descriptor");
|
||||
|
||||
pub use page_service_client::PageServiceClient;
|
||||
pub use page_service_server::{PageService, PageServiceServer};
|
||||
}
|
||||
@@ -144,7 +144,7 @@ where
|
||||
replica,
|
||||
ctx,
|
||||
io_concurrency: IoConcurrency::spawn_from_conf(
|
||||
timeline.conf,
|
||||
timeline.conf.get_vectored_concurrent_io,
|
||||
timeline
|
||||
.gate
|
||||
.enter()
|
||||
@@ -343,7 +343,7 @@ where
|
||||
// Gather non-relational files from object storage pages.
|
||||
let slru_partitions = self
|
||||
.timeline
|
||||
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
|
||||
.get_slru_keyspace(Version::at(self.lsn), self.ctx)
|
||||
.await?
|
||||
.partition(
|
||||
self.timeline.get_shard_identity(),
|
||||
@@ -378,7 +378,7 @@ where
|
||||
// Otherwise only include init forks of unlogged relations.
|
||||
let rels = self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
|
||||
.await?;
|
||||
for &rel in rels.iter() {
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
@@ -517,7 +517,7 @@ where
|
||||
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> {
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
|
||||
.get_rel_size(src, Version::at(self.lsn), self.ctx)
|
||||
.await?;
|
||||
|
||||
// If the relation is empty, create an empty file
|
||||
@@ -577,7 +577,7 @@ where
|
||||
let relmap_img = if has_relmap_file {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.get_relmap_file(spcnode, dbnode, Version::at(self.lsn), self.ctx)
|
||||
.await?;
|
||||
|
||||
if img.len()
|
||||
@@ -631,7 +631,7 @@ where
|
||||
if !has_relmap_file
|
||||
&& self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
|
||||
.await?
|
||||
.is_empty()
|
||||
{
|
||||
|
||||
@@ -3199,7 +3199,7 @@ async fn list_aux_files(
|
||||
.await?;
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
state.conf,
|
||||
state.conf.get_vectored_concurrent_io,
|
||||
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
|
||||
);
|
||||
|
||||
|
||||
@@ -843,23 +843,50 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy<IntCounter> = Lazy::new(|
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
pub(crate) static RELSIZE_LATEST_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_relsize_cache_entries",
|
||||
"Number of entries in the relation size cache",
|
||||
"pageserver_relsize_latest_cache_entries",
|
||||
"Number of entries in the latest relation size cache",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",)
|
||||
.expect("failed to define a metric")
|
||||
pub(crate) static RELSIZE_LATEST_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_latest_cache_hits",
|
||||
"Latest relation size cache hits",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
pub(crate) static RELSIZE_LATEST_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses",
|
||||
"Relation size cache misses",
|
||||
"pageserver_relsize_latest_cache_misses",
|
||||
"Relation size latest cache misses",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_SNAPSHOT_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_relsize_snapshot_cache_entries",
|
||||
"Number of entries in the pitr relation size cache",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_SNAPSHOT_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_snapshot_cache_hits",
|
||||
"Pitr relation size cache hits",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_SNAPSHOT_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_snapshot_cache_misses",
|
||||
"Relation size snapshot cache misses",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
@@ -18,7 +18,7 @@ use itertools::Itertools;
|
||||
use jsonwebtoken::TokenData;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
|
||||
};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
@@ -62,7 +62,7 @@ use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
|
||||
SmgrOpTimer, TimelineMetrics,
|
||||
};
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::pgdatadir_mapping::{LsnRange, Version};
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
|
||||
@@ -331,10 +331,10 @@ async fn page_service_conn_main(
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
// and create the per-query context in process_query ourselves.
|
||||
let mut conn_handler = PageServerHandler::new(
|
||||
conf,
|
||||
tenant_manager,
|
||||
auth,
|
||||
pipelining_config,
|
||||
conf.get_vectored_concurrent_io,
|
||||
perf_span_fields,
|
||||
connection_ctx,
|
||||
cancel.clone(),
|
||||
@@ -371,7 +371,6 @@ async fn page_service_conn_main(
|
||||
}
|
||||
|
||||
struct PageServerHandler {
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
claims: Option<Claims>,
|
||||
|
||||
@@ -389,6 +388,7 @@ struct PageServerHandler {
|
||||
timeline_handles: Option<TimelineHandles>,
|
||||
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
|
||||
gate_guard: GateGuard,
|
||||
}
|
||||
@@ -642,7 +642,7 @@ impl std::fmt::Display for BatchedPageStreamError {
|
||||
struct BatchedGetPageRequest {
|
||||
req: PagestreamGetPageRequest,
|
||||
timer: SmgrOpTimer,
|
||||
effective_request_lsn: Lsn,
|
||||
lsn_range: LsnRange,
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
@@ -764,12 +764,12 @@ impl BatchedFeMessage {
|
||||
match batching_strategy {
|
||||
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
|
||||
if let Some(last_in_batch) = accum_pages.last() {
|
||||
if last_in_batch.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
if last_in_batch.lsn_range.effective_lsn
|
||||
!= this_pages[0].lsn_range.effective_lsn
|
||||
{
|
||||
trace!(
|
||||
accum_lsn = %last_in_batch.effective_request_lsn,
|
||||
this_lsn = %this_pages[0].effective_request_lsn,
|
||||
accum_lsn = %last_in_batch.lsn_range.effective_lsn,
|
||||
this_lsn = %this_pages[0].lsn_range.effective_lsn,
|
||||
"stopping batching because LSN changed"
|
||||
);
|
||||
|
||||
@@ -784,15 +784,15 @@ impl BatchedFeMessage {
|
||||
let same_page_different_lsn = accum_pages.iter().any(|batched| {
|
||||
batched.req.rel == this_pages[0].req.rel
|
||||
&& batched.req.blkno == this_pages[0].req.blkno
|
||||
&& batched.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
&& batched.lsn_range.effective_lsn
|
||||
!= this_pages[0].lsn_range.effective_lsn
|
||||
});
|
||||
|
||||
if same_page_different_lsn {
|
||||
trace!(
|
||||
rel=%this_pages[0].req.rel,
|
||||
blkno=%this_pages[0].req.blkno,
|
||||
lsn=%this_pages[0].effective_request_lsn,
|
||||
lsn=%this_pages[0].lsn_range.effective_lsn,
|
||||
"stopping batching because same page was requested at different LSNs"
|
||||
);
|
||||
|
||||
@@ -844,17 +844,16 @@ impl BatchedFeMessage {
|
||||
impl PageServerHandler {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
perf_span_fields: ConnectionPerfSpanFields,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
) -> Self {
|
||||
PageServerHandler {
|
||||
conf,
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
@@ -862,6 +861,7 @@ impl PageServerHandler {
|
||||
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
|
||||
cancel,
|
||||
pipelining_config,
|
||||
get_vectored_concurrent_io,
|
||||
gate_guard,
|
||||
}
|
||||
}
|
||||
@@ -1158,7 +1158,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
// We're holding the Handle
|
||||
let effective_request_lsn = match Self::effective_request_lsn(
|
||||
let effective_lsn = match Self::effective_request_lsn(
|
||||
&shard,
|
||||
shard.get_last_record_lsn(),
|
||||
req.hdr.request_lsn,
|
||||
@@ -1177,7 +1177,10 @@ impl PageServerHandler {
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest {
|
||||
req,
|
||||
timer,
|
||||
effective_request_lsn,
|
||||
lsn_range: LsnRange {
|
||||
effective_lsn,
|
||||
request_lsn: req.hdr.request_lsn
|
||||
},
|
||||
ctx,
|
||||
}],
|
||||
// The executor grabs the batch when it becomes idle.
|
||||
@@ -1278,7 +1281,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
async fn pagesteam_handle_batched_message<IO>(
|
||||
async fn pagestream_handle_batched_message<IO>(
|
||||
&mut self,
|
||||
pgb_writer: &mut PostgresBackend<IO>,
|
||||
batch: BatchedFeMessage,
|
||||
@@ -1623,7 +1626,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.get_vectored_concurrent_io,
|
||||
match self.gate_guard.try_clone() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
@@ -1733,7 +1736,7 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
let result = self
|
||||
.pagesteam_handle_batched_message(
|
||||
.pagestream_handle_batched_message(
|
||||
pgb_writer,
|
||||
msg,
|
||||
io_concurrency.clone(),
|
||||
@@ -1909,7 +1912,7 @@ impl PageServerHandler {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
self.pagesteam_handle_batched_message(
|
||||
self.pagestream_handle_batched_message(
|
||||
pgb_writer,
|
||||
batch,
|
||||
io_concurrency.clone(),
|
||||
@@ -2127,7 +2130,14 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
let exists = timeline
|
||||
.get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
|
||||
.get_rel_exists(
|
||||
req.rel,
|
||||
Version::LsnRange(LsnRange {
|
||||
effective_lsn: lsn,
|
||||
request_lsn: req.hdr.request_lsn,
|
||||
}),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
||||
@@ -2154,7 +2164,14 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
let n_blocks = timeline
|
||||
.get_rel_size(req.rel, Version::Lsn(lsn), ctx)
|
||||
.get_rel_size(
|
||||
req.rel,
|
||||
Version::LsnRange(LsnRange {
|
||||
effective_lsn: lsn,
|
||||
request_lsn: req.hdr.request_lsn,
|
||||
}),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
||||
@@ -2181,7 +2198,15 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
let total_blocks = timeline
|
||||
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
|
||||
.get_db_size(
|
||||
DEFAULTTABLESPACE_OID,
|
||||
req.dbnode,
|
||||
Version::LsnRange(LsnRange {
|
||||
effective_lsn: lsn,
|
||||
request_lsn: req.hdr.request_lsn,
|
||||
}),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let db_size = total_blocks as i64 * BLCKSZ as i64;
|
||||
|
||||
@@ -2214,7 +2239,7 @@ impl PageServerHandler {
|
||||
// Ignore error (trace buffer may be full or tracer may have disconnected).
|
||||
_ = page_trace.try_send(PageTraceEvent {
|
||||
key,
|
||||
effective_lsn: batch.effective_request_lsn,
|
||||
effective_lsn: batch.lsn_range.effective_lsn,
|
||||
time,
|
||||
});
|
||||
}
|
||||
@@ -2229,7 +2254,7 @@ impl PageServerHandler {
|
||||
perf_instrument = true;
|
||||
}
|
||||
|
||||
req.effective_request_lsn
|
||||
req.lsn_range.effective_lsn
|
||||
})
|
||||
.max()
|
||||
.expect("batch is never empty");
|
||||
@@ -2283,7 +2308,7 @@ impl PageServerHandler {
|
||||
(
|
||||
&p.req.rel,
|
||||
&p.req.blkno,
|
||||
p.effective_request_lsn,
|
||||
p.lsn_range,
|
||||
p.ctx.attached_child(),
|
||||
)
|
||||
}),
|
||||
|
||||
@@ -43,7 +43,9 @@ use crate::aux_file;
|
||||
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
|
||||
RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
|
||||
RELSIZE_SNAPSHOT_CACHE_MISSES,
|
||||
};
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
@@ -90,6 +92,28 @@ pub enum LsnForTimestamp {
|
||||
NoData(Lsn),
|
||||
}
|
||||
|
||||
/// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
|
||||
/// See comments libs/pageserver_api/src/models.rs.
|
||||
/// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
|
||||
/// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub struct LsnRange {
|
||||
pub effective_lsn: Lsn,
|
||||
pub request_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl LsnRange {
|
||||
pub fn at(lsn: Lsn) -> LsnRange {
|
||||
LsnRange {
|
||||
effective_lsn: lsn,
|
||||
request_lsn: lsn,
|
||||
}
|
||||
}
|
||||
pub fn is_latest(&self) -> bool {
|
||||
self.request_lsn == Lsn::MAX
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum CalculateLogicalSizeError {
|
||||
#[error("cancelled")]
|
||||
@@ -202,13 +226,13 @@ impl Timeline {
|
||||
io_concurrency: IoConcurrency,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
match version {
|
||||
Version::Lsn(effective_lsn) => {
|
||||
Version::LsnRange(lsns) => {
|
||||
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
|
||||
let res = self
|
||||
.get_rel_page_at_lsn_batched(
|
||||
pages.iter().map(|(tag, blknum)| {
|
||||
(tag, blknum, effective_lsn, ctx.attached_child())
|
||||
}),
|
||||
pages
|
||||
.iter()
|
||||
.map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
)
|
||||
@@ -246,7 +270,7 @@ impl Timeline {
|
||||
/// The ordering of the returned vec corresponds to the ordering of `pages`.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, Lsn, RequestContext)>,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<Bytes, PageReconstructError>> {
|
||||
@@ -265,7 +289,7 @@ impl Timeline {
|
||||
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
|
||||
HashMap::with_capacity(pages.len());
|
||||
|
||||
for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() {
|
||||
for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
|
||||
if tag.relnode == 0 {
|
||||
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
@@ -274,7 +298,7 @@ impl Timeline {
|
||||
slots_filled += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let lsn = lsns.effective_lsn;
|
||||
let nblocks = {
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
@@ -289,7 +313,7 @@ impl Timeline {
|
||||
.attached_child();
|
||||
|
||||
match self
|
||||
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
|
||||
.get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
{
|
||||
@@ -470,7 +494,7 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
|
||||
if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
|
||||
return Ok(nblocks);
|
||||
}
|
||||
|
||||
@@ -488,7 +512,7 @@ impl Timeline {
|
||||
let mut buf = version.get(self, key, ctx).await?;
|
||||
let nblocks = buf.get_u32_le();
|
||||
|
||||
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
|
||||
self.update_cached_rel_size(tag, version, nblocks);
|
||||
|
||||
Ok(nblocks)
|
||||
}
|
||||
@@ -510,7 +534,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// first try to lookup relation in cache
|
||||
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
|
||||
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
|
||||
return Ok(true);
|
||||
}
|
||||
// then check if the database was already initialized.
|
||||
@@ -586,7 +610,7 @@ impl Timeline {
|
||||
// scan directory listing (new), merge with the old results
|
||||
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
@@ -632,7 +656,7 @@ impl Timeline {
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
assert!(self.tenant_shard_id.is_shard_zero());
|
||||
let n_blocks = self
|
||||
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
|
||||
.get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
|
||||
.await?;
|
||||
|
||||
let keyspace = KeySpace::single(
|
||||
@@ -645,7 +669,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
@@ -867,11 +891,11 @@ impl Timeline {
|
||||
mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
|
||||
) -> Result<T, PageReconstructError> {
|
||||
for segno in self
|
||||
.list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
|
||||
.list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
|
||||
.await?
|
||||
{
|
||||
let nblocks = self
|
||||
.get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
|
||||
.get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
|
||||
.await?;
|
||||
|
||||
let keyspace = KeySpace::single(
|
||||
@@ -885,7 +909,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
@@ -1137,7 +1161,7 @@ impl Timeline {
|
||||
let mut total_size: u64 = 0;
|
||||
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
for rel in self
|
||||
.list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
|
||||
.list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
|
||||
.await?
|
||||
{
|
||||
if self.cancel.is_cancelled() {
|
||||
@@ -1212,7 +1236,7 @@ impl Timeline {
|
||||
result.add_key(rel_dir_to_key(spcnode, dbnode));
|
||||
|
||||
let mut rels: Vec<RelTag> = self
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
|
||||
.list_rels(spcnode, dbnode, Version::at(lsn), ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
@@ -1329,59 +1353,75 @@ impl Timeline {
|
||||
Ok((dense_keyspace, sparse_keyspace))
|
||||
}
|
||||
|
||||
/// Get cached size of relation if it not updated after specified LSN
|
||||
pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
|
||||
let rel_size_cache = self.rel_size_cache.read().unwrap();
|
||||
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
|
||||
if lsn >= *cached_lsn {
|
||||
RELSIZE_CACHE_HITS.inc();
|
||||
return Some(*nblocks);
|
||||
/// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
|
||||
/// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
|
||||
/// at the particular LSN (snapshot).
|
||||
pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
|
||||
let lsn = version.get_lsn();
|
||||
{
|
||||
let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
|
||||
if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
|
||||
if lsn >= *cached_lsn {
|
||||
RELSIZE_LATEST_CACHE_HITS.inc();
|
||||
return Some(*nblocks);
|
||||
}
|
||||
RELSIZE_CACHE_MISSES_OLD.inc();
|
||||
}
|
||||
RELSIZE_CACHE_MISSES_OLD.inc();
|
||||
}
|
||||
RELSIZE_CACHE_MISSES.inc();
|
||||
{
|
||||
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
|
||||
if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
|
||||
RELSIZE_SNAPSHOT_CACHE_HITS.inc();
|
||||
return Some(*nblock);
|
||||
}
|
||||
}
|
||||
if version.is_latest() {
|
||||
RELSIZE_LATEST_CACHE_MISSES.inc();
|
||||
} else {
|
||||
RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Update cached relation size if there is no more recent update
|
||||
pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
|
||||
if lsn < rel_size_cache.complete_as_of {
|
||||
// Do not cache old values. It's safe to cache the size on read, as long as
|
||||
// the read was at an LSN since we started the WAL ingestion. Reasoning: we
|
||||
// never evict values from the cache, so if the relation size changed after
|
||||
// 'lsn', the new value is already in the cache.
|
||||
return;
|
||||
}
|
||||
|
||||
match rel_size_cache.map.entry(tag) {
|
||||
hash_map::Entry::Occupied(mut entry) => {
|
||||
let cached_lsn = entry.get_mut();
|
||||
if lsn >= cached_lsn.0 {
|
||||
*cached_lsn = (lsn, nblocks);
|
||||
pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
|
||||
let lsn = version.get_lsn();
|
||||
if version.is_latest() {
|
||||
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
|
||||
match rel_size_cache.entry(tag) {
|
||||
hash_map::Entry::Occupied(mut entry) => {
|
||||
let cached_lsn = entry.get_mut();
|
||||
if lsn >= cached_lsn.0 {
|
||||
*cached_lsn = (lsn, nblocks);
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_LATEST_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
} else {
|
||||
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
|
||||
if rel_size_cache.capacity() != 0 {
|
||||
rel_size_cache.insert((lsn, tag), nblocks);
|
||||
RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Store cached relation size
|
||||
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
|
||||
if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
|
||||
RELSIZE_LATEST_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove cached relation size
|
||||
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
if rel_size_cache.map.remove(tag).is_some() {
|
||||
RELSIZE_CACHE_ENTRIES.dec();
|
||||
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
|
||||
if rel_size_cache.remove(tag).is_some() {
|
||||
RELSIZE_LATEST_CACHE_ENTRIES.dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1585,7 +1625,10 @@ impl DatadirModification<'_> {
|
||||
// check the cache too. This is because eagerly checking the cache results in
|
||||
// less work overall and 10% better performance. It's more work on cache miss
|
||||
// but cache miss is rare.
|
||||
if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
|
||||
if let Some(nblocks) = self
|
||||
.tline
|
||||
.get_cached_rel_size(&rel, Version::Modified(self))
|
||||
{
|
||||
Ok(nblocks)
|
||||
} else if !self
|
||||
.tline
|
||||
@@ -2667,7 +2710,7 @@ pub struct DatadirModificationStats {
|
||||
/// timeline to not miss the latest updates.
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum Version<'a> {
|
||||
Lsn(Lsn),
|
||||
LsnRange(LsnRange),
|
||||
Modified(&'a DatadirModification<'a>),
|
||||
}
|
||||
|
||||
@@ -2679,7 +2722,7 @@ impl Version<'_> {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
match self {
|
||||
Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
|
||||
Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
|
||||
Version::Modified(modification) => modification.get(key, ctx).await,
|
||||
}
|
||||
}
|
||||
@@ -2701,12 +2744,26 @@ impl Version<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_lsn(&self) -> Lsn {
|
||||
pub fn is_latest(&self) -> bool {
|
||||
match self {
|
||||
Version::Lsn(lsn) => *lsn,
|
||||
Version::LsnRange(lsns) => lsns.is_latest(),
|
||||
Version::Modified(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_lsn(&self) -> Lsn {
|
||||
match self {
|
||||
Version::LsnRange(lsns) => lsns.effective_lsn,
|
||||
Version::Modified(modification) => modification.lsn,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn at(lsn: Lsn) -> Self {
|
||||
Version::LsnRange(LsnRange {
|
||||
effective_lsn: lsn,
|
||||
request_lsn: lsn,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
//--- Metadata structs stored in key-value pairs in the repository.
|
||||
|
||||
@@ -8596,8 +8596,10 @@ mod tests {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<Bytes>, GetVectoredError> {
|
||||
let io_concurrency =
|
||||
IoConcurrency::spawn_from_conf(tline.conf, tline.gate.enter().unwrap());
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
tline.conf.get_vectored_concurrent_io,
|
||||
tline.gate.enter().unwrap(),
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
|
||||
let mut res = tline
|
||||
|
||||
@@ -31,6 +31,7 @@ pub use inmemory_layer::InMemoryLayer;
|
||||
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
|
||||
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
|
||||
pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
@@ -43,7 +44,6 @@ use self::inmemory_layer::InMemoryLayerFileId;
|
||||
use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
@@ -318,11 +318,10 @@ impl IoConcurrency {
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_from_conf(
|
||||
conf: &'static PageServerConf,
|
||||
conf: GetVectoredConcurrentIo,
|
||||
gate_guard: GateGuard,
|
||||
) -> IoConcurrency {
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
let selected = match conf.get_vectored_concurrent_io {
|
||||
let selected = match conf {
|
||||
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
|
||||
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
|
||||
};
|
||||
|
||||
@@ -14,6 +14,7 @@ pub mod span;
|
||||
pub mod uninit;
|
||||
mod walreceiver;
|
||||
|
||||
use hashlink::LruCache;
|
||||
use std::array;
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::btree_map::Entry;
|
||||
@@ -197,16 +198,6 @@ pub struct TimelineResources {
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
}
|
||||
|
||||
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
|
||||
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
|
||||
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
|
||||
/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the
|
||||
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
|
||||
pub(crate) struct RelSizeCache {
|
||||
pub(crate) complete_as_of: Lsn,
|
||||
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
pub(crate) conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
@@ -365,7 +356,8 @@ pub struct Timeline {
|
||||
pub walreceiver: Mutex<Option<WalReceiver>>,
|
||||
|
||||
/// Relation size cache
|
||||
pub(crate) rel_size_cache: RwLock<RelSizeCache>,
|
||||
pub(crate) rel_size_latest_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
|
||||
pub(crate) rel_size_snapshot_cache: Mutex<LruCache<(Lsn, RelTag), BlockNumber>>,
|
||||
|
||||
download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
|
||||
|
||||
@@ -2820,6 +2812,13 @@ impl Timeline {
|
||||
|
||||
self.remote_client.update_config(&new_conf.location);
|
||||
|
||||
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
|
||||
if let Some(new_capacity) = new_conf.tenant_conf.relsize_snapshot_cache_capacity {
|
||||
if new_capacity != rel_size_cache.capacity() {
|
||||
rel_size_cache.set_capacity(new_capacity);
|
||||
}
|
||||
}
|
||||
|
||||
self.metrics
|
||||
.evictions_with_low_residence_duration
|
||||
.write()
|
||||
@@ -2878,6 +2877,14 @@ impl Timeline {
|
||||
ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn(), is_offloaded);
|
||||
}
|
||||
|
||||
let relsize_snapshot_cache_capacity = {
|
||||
let loaded_tenant_conf = tenant_conf.load();
|
||||
loaded_tenant_conf
|
||||
.tenant_conf
|
||||
.relsize_snapshot_cache_capacity
|
||||
.unwrap_or(conf.default_tenant_conf.relsize_snapshot_cache_capacity)
|
||||
};
|
||||
|
||||
Arc::new_cyclic(|myself| {
|
||||
let metrics = Arc::new(TimelineMetrics::new(
|
||||
&tenant_shard_id,
|
||||
@@ -2969,10 +2976,8 @@ impl Timeline {
|
||||
last_image_layer_creation_check_instant: Mutex::new(None),
|
||||
|
||||
last_received_wal: Mutex::new(None),
|
||||
rel_size_cache: RwLock::new(RelSizeCache {
|
||||
complete_as_of: disk_consistent_lsn,
|
||||
map: HashMap::new(),
|
||||
}),
|
||||
rel_size_latest_cache: RwLock::new(HashMap::new()),
|
||||
rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)),
|
||||
|
||||
download_all_remote_layers_task_info: RwLock::new(None),
|
||||
|
||||
@@ -3530,7 +3535,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self_ref.conf,
|
||||
self_ref.conf.get_vectored_concurrent_io,
|
||||
self_ref
|
||||
.gate
|
||||
.enter()
|
||||
@@ -5559,7 +5564,7 @@ impl Timeline {
|
||||
});
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| CreateImageLayersError::Cancelled)?,
|
||||
|
||||
@@ -188,7 +188,7 @@ pub(crate) async fn generate_tombstone_image_layer(
|
||||
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
|
||||
);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
detached.conf,
|
||||
detached.conf.get_vectored_concurrent_io,
|
||||
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
|
||||
@@ -1684,31 +1684,31 @@ mod tests {
|
||||
// The relation was created at LSN 2, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
assert!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
3
|
||||
);
|
||||
@@ -1719,7 +1719,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x20)),
|
||||
Version::at(Lsn(0x20)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1733,7 +1733,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x30)),
|
||||
Version::at(Lsn(0x30)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1747,7 +1747,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x40)),
|
||||
Version::at(Lsn(0x40)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1760,7 +1760,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
1,
|
||||
Version::Lsn(Lsn(0x40)),
|
||||
Version::at(Lsn(0x40)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1774,7 +1774,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x50)),
|
||||
Version::at(Lsn(0x50)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1787,7 +1787,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
1,
|
||||
Version::Lsn(Lsn(0x50)),
|
||||
Version::at(Lsn(0x50)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1800,7 +1800,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
2,
|
||||
Version::Lsn(Lsn(0x50)),
|
||||
Version::at(Lsn(0x50)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1820,7 +1820,7 @@ mod tests {
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
|
||||
.await?,
|
||||
2
|
||||
);
|
||||
@@ -1829,7 +1829,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x60)),
|
||||
Version::at(Lsn(0x60)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1842,7 +1842,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
1,
|
||||
Version::Lsn(Lsn(0x60)),
|
||||
Version::at(Lsn(0x60)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1854,7 +1854,7 @@ mod tests {
|
||||
// should still see the truncated block with older LSN
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
3
|
||||
);
|
||||
@@ -1863,7 +1863,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
2,
|
||||
Version::Lsn(Lsn(0x50)),
|
||||
Version::at(Lsn(0x50)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1880,7 +1880,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x68)), &ctx)
|
||||
.await?,
|
||||
0
|
||||
);
|
||||
@@ -1893,7 +1893,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x70)), &ctx)
|
||||
.await?,
|
||||
2
|
||||
);
|
||||
@@ -1902,7 +1902,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x70)),
|
||||
Version::at(Lsn(0x70)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1915,7 +1915,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
1,
|
||||
Version::Lsn(Lsn(0x70)),
|
||||
Version::at(Lsn(0x70)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1932,7 +1932,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
1501
|
||||
);
|
||||
@@ -1942,7 +1942,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
blk,
|
||||
Version::Lsn(Lsn(0x80)),
|
||||
Version::at(Lsn(0x80)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1956,7 +1956,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
1500,
|
||||
Version::Lsn(Lsn(0x80)),
|
||||
Version::at(Lsn(0x80)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1990,13 +1990,13 @@ mod tests {
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -2011,7 +2011,7 @@ mod tests {
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x30)), &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
@@ -2029,13 +2029,13 @@ mod tests {
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -2077,26 +2077,26 @@ mod tests {
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
assert!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2110,7 +2110,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
blkno,
|
||||
Version::Lsn(lsn),
|
||||
Version::at(lsn),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -2131,7 +2131,7 @@ mod tests {
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -2144,7 +2144,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
blkno,
|
||||
Version::Lsn(Lsn(0x60)),
|
||||
Version::at(Lsn(0x60)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -2157,7 +2157,7 @@ mod tests {
|
||||
// should still see all blocks with older LSN
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2169,7 +2169,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
blkno,
|
||||
Version::Lsn(Lsn(0x50)),
|
||||
Version::at(Lsn(0x50)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -2193,13 +2193,13 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2212,7 +2212,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
blkno,
|
||||
Version::Lsn(Lsn(0x80)),
|
||||
Version::at(Lsn(0x80)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -2250,7 +2250,7 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
RELSEG_SIZE + 1
|
||||
);
|
||||
@@ -2264,7 +2264,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
RELSEG_SIZE
|
||||
);
|
||||
@@ -2279,7 +2279,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
RELSEG_SIZE - 1
|
||||
);
|
||||
@@ -2297,7 +2297,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
size as BlockNumber
|
||||
);
|
||||
|
||||
@@ -86,7 +86,7 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
|
||||
|
||||
#define InvalidRelFileNumber InvalidOid
|
||||
|
||||
#define SMgrRelGetRelInfo(reln) \
|
||||
#define SMgrRelGetRelInfo(reln) \
|
||||
(reln->smgr_rnode.node)
|
||||
|
||||
#define DropRelationAllLocalBuffers DropRelFileNodeAllLocalBuffers
|
||||
@@ -148,6 +148,12 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
|
||||
#define DropRelationAllLocalBuffers DropRelationAllLocalBuffers
|
||||
#endif
|
||||
|
||||
#define NRelFileInfoInvalidate(rinfo) do { \
|
||||
NInfoGetSpcOid(rinfo) = InvalidOid; \
|
||||
NInfoGetDbOid(rinfo) = InvalidOid; \
|
||||
NInfoGetRelNumber(rinfo) = InvalidRelFileNumber; \
|
||||
} while (0)
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
#define ProcNumber BackendId
|
||||
#define INVALID_PROC_NUMBER InvalidBackendId
|
||||
|
||||
@@ -108,7 +108,7 @@ typedef enum
|
||||
UNLOGGED_BUILD_NOT_PERMANENT
|
||||
} UnloggedBuildPhase;
|
||||
|
||||
static SMgrRelation unlogged_build_rel = NULL;
|
||||
static NRelFileInfo unlogged_build_rel_info;
|
||||
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
|
||||
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
||||
@@ -912,8 +912,14 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdextend(reln, forkNum, blkno, buffer, skipFsync);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1000,8 +1006,14 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1376,8 +1388,14 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdread(reln, forkNum, blkno, buffer);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1463,8 +1481,14 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdreadv(reln, forknum, blocknum, buffers, nblocks);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1597,6 +1621,15 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
|
||||
#else
|
||||
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1666,6 +1699,11 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1706,6 +1744,10 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
return mdnblocks(reln, forknum);
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1775,6 +1817,11 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdtruncate(reln, forknum, old_blocks, nblocks);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1913,7 +1960,6 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
*/
|
||||
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
|
||||
neon_log(ERROR, "unlogged relation build is already in progress");
|
||||
Assert(unlogged_build_rel == NULL);
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg(NEON_TAG "starting unlogged build of relation %u/%u/%u",
|
||||
@@ -1930,7 +1976,7 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
unlogged_build_rel = reln;
|
||||
unlogged_build_rel_info = InfoFromSMgrRel(reln);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (!IsParallelWorker())
|
||||
@@ -1951,12 +1997,9 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
|
||||
#endif
|
||||
|
||||
unlogged_build_rel = reln;
|
||||
unlogged_build_rel_info = InfoFromSMgrRel(reln);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
|
||||
|
||||
/* Make the relation look like it's unlogged */
|
||||
reln->smgr_relpersistence = RELPERSISTENCE_UNLOGGED;
|
||||
|
||||
/*
|
||||
* Create the local file. In a parallel build, the leader is expected to
|
||||
* call this first and do it.
|
||||
@@ -1983,17 +2026,16 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
static void
|
||||
neon_finish_unlogged_build_phase_1(SMgrRelation reln)
|
||||
{
|
||||
Assert(unlogged_build_rel == reln);
|
||||
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg(NEON_TAG "finishing phase 1 of unlogged build of relation %u/%u/%u",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)))));
|
||||
RelFileInfoFmt((unlogged_build_rel_info)))));
|
||||
|
||||
if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT)
|
||||
return;
|
||||
|
||||
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_1);
|
||||
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
|
||||
|
||||
/*
|
||||
* In a parallel build, (only) the leader process performs the 2nd
|
||||
@@ -2001,7 +2043,7 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
|
||||
*/
|
||||
if (IsParallelWorker())
|
||||
{
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
}
|
||||
else
|
||||
@@ -2022,11 +2064,11 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
{
|
||||
NRelFileInfoBackend rinfob = InfoBFromSMgrRel(reln);
|
||||
|
||||
Assert(unlogged_build_rel == reln);
|
||||
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg(NEON_TAG "ending unlogged build of relation %u/%u/%u",
|
||||
RelFileInfoFmt(InfoFromNInfoB(rinfob)))));
|
||||
RelFileInfoFmt(unlogged_build_rel_info))));
|
||||
|
||||
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
|
||||
{
|
||||
@@ -2034,7 +2076,6 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
BlockNumber nblocks;
|
||||
|
||||
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_2);
|
||||
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
|
||||
|
||||
/*
|
||||
* Update the last-written LSN cache.
|
||||
@@ -2055,9 +2096,6 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
InfoFromNInfoB(rinfob),
|
||||
MAIN_FORKNUM);
|
||||
|
||||
/* Make the relation look permanent again */
|
||||
reln->smgr_relpersistence = RELPERSISTENCE_PERMANENT;
|
||||
|
||||
/* Remove local copy */
|
||||
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
|
||||
{
|
||||
@@ -2078,7 +2116,7 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
mdunlink(rinfob, INIT_FORKNUM, true);
|
||||
#endif
|
||||
}
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
}
|
||||
|
||||
@@ -2151,7 +2189,7 @@ AtEOXact_neon(XactEvent event, void *arg)
|
||||
* Forget about any build we might have had in progress. The local
|
||||
* file will be unlinked by smgrDoPendingDeletes()
|
||||
*/
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
break;
|
||||
|
||||
@@ -2163,7 +2201,7 @@ AtEOXact_neon(XactEvent event, void *arg)
|
||||
case XACT_EVENT_PRE_PREPARE:
|
||||
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
|
||||
{
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
|
||||
15
poetry.lock
generated
15
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -1145,18 +1145,19 @@ dotenv = ["python-dotenv"]
|
||||
|
||||
[[package]]
|
||||
name = "flask-cors"
|
||||
version = "5.0.0"
|
||||
description = "A Flask extension adding a decorator for CORS support"
|
||||
version = "6.0.0"
|
||||
description = "A Flask extension simplifying CORS support"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
python-versions = "<4.0,>=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "Flask_Cors-5.0.0-py2.py3-none-any.whl", hash = "sha256:b9e307d082a9261c100d8fb0ba909eec6a228ed1b60a8315fd85f783d61910bc"},
|
||||
{file = "flask_cors-5.0.0.tar.gz", hash = "sha256:5aadb4b950c4e93745034594d9f3ea6591f734bb3662e16e255ffbf5e89c88ef"},
|
||||
{file = "flask_cors-6.0.0-py3-none-any.whl", hash = "sha256:6332073356452343a8ccddbfec7befdc3fdd040141fe776ec9b94c262f058657"},
|
||||
{file = "flask_cors-6.0.0.tar.gz", hash = "sha256:4592c1570246bf7beee96b74bc0adbbfcb1b0318f6ba05c412e8909eceec3393"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
Flask = ">=0.9"
|
||||
flask = ">=0.9"
|
||||
Werkzeug = ">=0.7"
|
||||
|
||||
[[package]]
|
||||
name = "frozenlist"
|
||||
|
||||
@@ -73,7 +73,6 @@ rustc-hash.workspace = true
|
||||
rustls.workspace = true
|
||||
rustls-native-certs.workspace = true
|
||||
rustls-pemfile.workspace = true
|
||||
ryu = "1"
|
||||
scopeguard.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -536,7 +536,8 @@ mod tests {
|
||||
use control_plane::AuthSecret;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_protocol::authentication::sasl::{ChannelBinding, ScramSha256};
|
||||
use postgres_protocol::CSafeStr;
|
||||
use postgres_protocol::authentication::sasl::{ChannelBinding, SCRAM_SHA_256, ScramSha256};
|
||||
use postgres_protocol::message::backend::Message as PgMessage;
|
||||
use postgres_protocol::message::frontend;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
|
||||
@@ -714,15 +715,15 @@ mod tests {
|
||||
// server should offer scram
|
||||
match read_message(&mut client, &mut read).await {
|
||||
PgMessage::AuthenticationSasl(a) => {
|
||||
let options: Vec<&str> = a.mechanisms().collect().unwrap();
|
||||
assert_eq!(options, ["SCRAM-SHA-256"]);
|
||||
let options: Vec<&CSafeStr> = a.mechanisms().collect().unwrap();
|
||||
assert_eq!(options, [SCRAM_SHA_256]);
|
||||
}
|
||||
_ => panic!("wrong message"),
|
||||
}
|
||||
|
||||
// client sends client-first-message
|
||||
let mut write = BytesMut::new();
|
||||
frontend::sasl_initial_response("SCRAM-SHA-256", scram.message(), &mut write).unwrap();
|
||||
frontend::sasl_initial_response(SCRAM_SHA_256, scram.message(), &mut write);
|
||||
client.write_all(&write).await.unwrap();
|
||||
|
||||
// server response with server-first-message
|
||||
@@ -735,7 +736,7 @@ mod tests {
|
||||
|
||||
// client response with client-final-message
|
||||
write.clear();
|
||||
frontend::sasl_response(scram.message(), &mut write).unwrap();
|
||||
frontend::sasl_response(scram.message(), &mut write);
|
||||
client.write_all(&write).await.unwrap();
|
||||
|
||||
// server response with server-final-message
|
||||
@@ -800,7 +801,7 @@ mod tests {
|
||||
|
||||
// client responds with password
|
||||
write.clear();
|
||||
frontend::password_message(b"my-secret-password", &mut write).unwrap();
|
||||
frontend::password_message(c"my-secret-password".into(), &mut write);
|
||||
client.write_all(&write).await.unwrap();
|
||||
});
|
||||
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards(
|
||||
@@ -853,8 +854,10 @@ mod tests {
|
||||
|
||||
// client responds with password
|
||||
let mut write = BytesMut::new();
|
||||
frontend::password_message(b"endpoint=my-endpoint;my-secret-password", &mut write)
|
||||
.unwrap();
|
||||
frontend::password_message(
|
||||
c"endpoint=my-endpoint;my-secret-password".into(),
|
||||
&mut write,
|
||||
);
|
||||
client.write_all(&write).await.unwrap();
|
||||
});
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
|
||||
use postgres_protocol::authentication::sasl::{SCRAM_SHA_256, SCRAM_SHA_256_PLUS};
|
||||
use pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::info;
|
||||
@@ -174,8 +173,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
|
||||
}
|
||||
|
||||
match sasl.method {
|
||||
SCRAM_SHA_256 => ctx.set_auth_method(crate::context::AuthMethod::ScramSha256),
|
||||
SCRAM_SHA_256_PLUS => ctx.set_auth_method(crate::context::AuthMethod::ScramSha256Plus),
|
||||
scram::SCRAM_SHA_256 => ctx.set_auth_method(crate::context::AuthMethod::ScramSha256),
|
||||
scram::SCRAM_SHA_256_PLUS => {
|
||||
ctx.set_auth_method(crate::context::AuthMethod::ScramSha256Plus)
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
|
||||
@@ -161,8 +161,11 @@ struct ProxyCliArgs {
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)]
|
||||
redis_rps_limit: Vec<RateBucketInfo>,
|
||||
/// Cancellation channel size (max queue size for redis kv client)
|
||||
#[clap(long, default_value = "1024")]
|
||||
#[clap(long, default_value_t = 1024)]
|
||||
cancellation_ch_size: usize,
|
||||
/// Cancellation ops batch size for redis
|
||||
#[clap(long, default_value_t = 8)]
|
||||
cancellation_batch_size: usize,
|
||||
/// cache for `allowed_ips` (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
allowed_ips_cache: String,
|
||||
@@ -542,7 +545,12 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
if let Some(mut redis_kv_client) = redis_kv_client {
|
||||
maintenance_tasks.spawn(async move {
|
||||
redis_kv_client.try_connect().await?;
|
||||
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await?;
|
||||
handle_cancel_messages(
|
||||
&mut redis_kv_client,
|
||||
rx_cancel,
|
||||
args.cancellation_batch_size,
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(redis_kv_client);
|
||||
|
||||
|
||||
@@ -30,8 +30,6 @@ use crate::tls::postgres_rustls::MakeRustlsConnect;
|
||||
type IpSubnetKey = IpNet;
|
||||
|
||||
const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time
|
||||
const REDIS_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10);
|
||||
const BATCH_SIZE: usize = 8;
|
||||
|
||||
// Message types for sending through mpsc channel
|
||||
pub enum CancelKeyOp {
|
||||
@@ -231,12 +229,13 @@ impl CancelReplyOp {
|
||||
pub async fn handle_cancel_messages(
|
||||
client: &mut RedisKVClient,
|
||||
mut rx: mpsc::Receiver<CancelKeyOp>,
|
||||
batch_size: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut batch = Vec::with_capacity(BATCH_SIZE);
|
||||
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
|
||||
let mut batch = Vec::with_capacity(batch_size);
|
||||
let mut pipeline = Pipeline::with_capacity(batch_size);
|
||||
|
||||
loop {
|
||||
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
|
||||
if rx.recv_many(&mut batch, batch_size).await == 0 {
|
||||
warn!("shutting down cancellation queue");
|
||||
break Ok(());
|
||||
}
|
||||
@@ -367,8 +366,7 @@ impl CancellationHandler {
|
||||
return Err(CancelError::InternalError);
|
||||
};
|
||||
|
||||
tx.send_timeout(op, REDIS_SEND_TIMEOUT)
|
||||
.await
|
||||
tx.try_send(op)
|
||||
.map_err(|e| {
|
||||
tracing::warn!("failed to send GetCancelData for {key}: {e}");
|
||||
})
|
||||
@@ -570,7 +568,7 @@ impl Session {
|
||||
}
|
||||
|
||||
// Send the store key op to the cancellation handler and set TTL for the key
|
||||
pub(crate) async fn write_cancel_key(
|
||||
pub(crate) fn write_cancel_key(
|
||||
&self,
|
||||
cancel_closure: CancelClosure,
|
||||
) -> Result<(), CancelError> {
|
||||
@@ -596,14 +594,14 @@ impl Session {
|
||||
expire: CANCEL_KEY_TTL,
|
||||
};
|
||||
|
||||
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
|
||||
let _ = tx.try_send(op).map_err(|e| {
|
||||
let key = self.key;
|
||||
tracing::warn!("failed to send StoreCancelKey for {key}: {e}");
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn remove_cancel_key(&self) -> Result<(), CancelError> {
|
||||
pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> {
|
||||
let Some(tx) = &self.cancellation_handler.tx else {
|
||||
tracing::warn!("cancellation handler is not available");
|
||||
return Err(CancelError::InternalError);
|
||||
@@ -619,7 +617,7 @@ impl Session {
|
||||
.guard(RedisMsgKind::HDel),
|
||||
};
|
||||
|
||||
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
|
||||
let _ = tx.try_send(op).map_err(|e| {
|
||||
let key = self.key;
|
||||
tracing::warn!("failed to send RemoveCancelKey for {key}: {e}");
|
||||
});
|
||||
|
||||
@@ -244,9 +244,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session = cancellation_handler_clone.get_key();
|
||||
|
||||
session
|
||||
.write_cancel_key(node.cancel_closure.clone())
|
||||
.await?;
|
||||
session.write_cancel_key(node.cancel_closure.clone())?;
|
||||
|
||||
prepare_client_connection(&node, *session.key(), &mut stream).await?;
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,356 +0,0 @@
|
||||
//! Vendoring of serde_json's string escaping code.
|
||||
//!
|
||||
//! <https://github.com/serde-rs/json/blob/c1826ebcccb1a520389c6b78ad3da15db279220d/src/ser.rs#L1514-L1552>
|
||||
//! <https://github.com/serde-rs/json/blob/c1826ebcccb1a520389c6b78ad3da15db279220d/src/ser.rs#L2081-L2157>
|
||||
//! Licensed by David Tolnay under MIT or Apache-2.0.
|
||||
//!
|
||||
//! With modifications by Conrad Ludgate on behalf of Neon.
|
||||
|
||||
use std::fmt::{self, Write};
|
||||
|
||||
use serde_json::ser::CharEscape;
|
||||
|
||||
#[must_use]
|
||||
pub struct ValueSer<'buf> {
|
||||
buf: &'buf mut Vec<u8>,
|
||||
}
|
||||
|
||||
impl<'buf> ValueSer<'buf> {
|
||||
pub fn new(buf: &'buf mut Vec<u8>) -> Self {
|
||||
Self { buf }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn serialize(self, value: &SerializedValue) {
|
||||
self.buf.extend_from_slice(&value.0);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn str(self, s: &str) {
|
||||
format_escaped_str(self.buf, s);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn str_args(self, s: fmt::Arguments) {
|
||||
format_escaped_display(self.buf, s);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn bytes_hex(self, s: &[u8]) {
|
||||
self.str_args(format_args!("{s:x?}"));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn int(self, x: impl itoa::Integer) {
|
||||
write_int(x, self.buf);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn float(self, x: impl ryu::Float) {
|
||||
write_float(x, self.buf);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn bool(self, x: bool) {
|
||||
let bool = if x { "true" } else { "false" };
|
||||
self.buf.extend_from_slice(bool.as_bytes());
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn map(self) -> MapSer<'buf> {
|
||||
MapSer::new(self.buf)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[expect(unused)]
|
||||
pub fn list(self) -> ListSer<'buf> {
|
||||
ListSer::new(self.buf)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MapSer<'buf> {
|
||||
buf: &'buf mut Vec<u8>,
|
||||
first: bool,
|
||||
}
|
||||
|
||||
impl<'buf> MapSer<'buf> {
|
||||
#[inline]
|
||||
fn new(buf: &'buf mut Vec<u8>) -> Self {
|
||||
buf.push(b'{');
|
||||
Self { buf, first: true }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn entry(&mut self, key: Escaped) -> ValueSer {
|
||||
self.entry_inner(|b| key.write(b))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn entry_escape(&mut self, key: &str) -> ValueSer {
|
||||
self.entry_inner(|b| format_escaped_str(b, key))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn entry_escape_args(&mut self, key: fmt::Arguments) -> ValueSer {
|
||||
self.entry_inner(|b| format_escaped_display(b, key))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn entry_inner(&mut self, f: impl FnOnce(&mut Vec<u8>)) -> ValueSer {
|
||||
if !self.first {
|
||||
self.buf.push(b',');
|
||||
}
|
||||
self.first = false;
|
||||
|
||||
f(self.buf);
|
||||
|
||||
self.buf.push(b':');
|
||||
ValueSer { buf: self.buf }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MapSer<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.buf.push(b'}');
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ListSer<'buf> {
|
||||
buf: &'buf mut Vec<u8>,
|
||||
first: bool,
|
||||
}
|
||||
|
||||
impl<'buf> ListSer<'buf> {
|
||||
#[inline]
|
||||
fn new(buf: &'buf mut Vec<u8>) -> Self {
|
||||
buf.push(b'[');
|
||||
Self { buf, first: true }
|
||||
}
|
||||
|
||||
#[expect(unused)]
|
||||
#[inline]
|
||||
fn entry(&mut self) -> ValueSer {
|
||||
if !self.first {
|
||||
self.buf.push(b',');
|
||||
}
|
||||
self.first = false;
|
||||
ValueSer { buf: self.buf }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ListSer<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.buf.push(b']');
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SerializedValue(Box<[u8]>);
|
||||
|
||||
impl SerializedValue {
|
||||
#[inline]
|
||||
pub fn str(s: &str) -> Self {
|
||||
let mut v = vec![];
|
||||
v.reserve_exact(2 + s.len());
|
||||
format_escaped_str(&mut v, s);
|
||||
Self(v.into_boxed_slice())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn str_args(s: fmt::Arguments) -> Self {
|
||||
if let Some(s) = s.as_str() {
|
||||
return Self::str(s);
|
||||
}
|
||||
|
||||
let mut v = vec![];
|
||||
format_escaped_display(&mut v, s);
|
||||
Self(v.into_boxed_slice())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn bytes_hex(s: &[u8]) -> Self {
|
||||
Self::str_args(format_args!("{s:x?}"))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn int(x: impl itoa::Integer) -> Self {
|
||||
Self(itoa::Buffer::new().format(x).as_bytes().into())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn float(x: impl ryu::Float) -> Self {
|
||||
Self(ryu::Buffer::new().format(x).as_bytes().into())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn bool(x: bool) -> Self {
|
||||
let bool = if x { "true" } else { "false" };
|
||||
Self(bool.as_bytes().into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a string that didn't need escaping because it's already valid json string.
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct Escaped(&'static str);
|
||||
|
||||
impl Escaped {
|
||||
pub const fn new(s: &'static str) -> Self {
|
||||
let mut i = 0;
|
||||
while i < s.len() {
|
||||
let escape = ESCAPE[s.as_bytes()[i] as usize];
|
||||
i += 1;
|
||||
assert!(escape == 0, "const json string should not need escaping");
|
||||
}
|
||||
|
||||
Self(s)
|
||||
}
|
||||
|
||||
pub fn as_str(self) -> &'static str {
|
||||
self.0
|
||||
}
|
||||
|
||||
fn write(self, buf: &mut Vec<u8>) {
|
||||
buf.push(b'"');
|
||||
buf.extend_from_slice(self.0.as_bytes());
|
||||
buf.push(b'"');
|
||||
}
|
||||
}
|
||||
|
||||
fn write_int(x: impl itoa::Integer, b: &mut Vec<u8>) {
|
||||
b.extend_from_slice(itoa::Buffer::new().format(x).as_bytes());
|
||||
}
|
||||
|
||||
fn write_float(x: impl ryu::Float, b: &mut Vec<u8>) {
|
||||
b.extend_from_slice(ryu::Buffer::new().format(x).as_bytes());
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn char_escape_from_escape_table(escape: u8, byte: u8) -> CharEscape {
|
||||
match escape {
|
||||
self::BB => CharEscape::Backspace,
|
||||
self::TT => CharEscape::Tab,
|
||||
self::NN => CharEscape::LineFeed,
|
||||
self::FF => CharEscape::FormFeed,
|
||||
self::RR => CharEscape::CarriageReturn,
|
||||
self::QU => CharEscape::Quote,
|
||||
self::BS => CharEscape::ReverseSolidus,
|
||||
self::UU => CharEscape::AsciiControl(byte),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn format_escaped_str(writer: &mut Vec<u8>, value: &str) {
|
||||
writer.push(b'"');
|
||||
let rest = format_escaped_str_contents(writer, value);
|
||||
writer.extend_from_slice(rest);
|
||||
writer.push(b'"');
|
||||
}
|
||||
|
||||
fn format_escaped_display(writer: &mut Vec<u8>, args: fmt::Arguments) {
|
||||
writer.push(b'"');
|
||||
|
||||
if let Some(s) = args.as_str() {
|
||||
let rest = format_escaped_str_contents(writer, s);
|
||||
writer.extend_from_slice(rest);
|
||||
} else {
|
||||
Collect { buf: writer }
|
||||
.write_fmt(args)
|
||||
.expect("formatting should not error");
|
||||
}
|
||||
|
||||
writer.push(b'"');
|
||||
}
|
||||
|
||||
struct Collect<'buf> {
|
||||
buf: &'buf mut Vec<u8>,
|
||||
}
|
||||
|
||||
impl fmt::Write for Collect<'_> {
|
||||
fn write_str(&mut self, s: &str) -> fmt::Result {
|
||||
let last = format_escaped_str_contents(self.buf, s);
|
||||
self.buf.extend(last);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// writes any escape sequences, and returns the suffix still needed to be written.
|
||||
fn format_escaped_str_contents<'a>(writer: &mut Vec<u8>, value: &'a str) -> &'a [u8] {
|
||||
let bytes = value.as_bytes();
|
||||
|
||||
let mut start = 0;
|
||||
|
||||
for (i, &byte) in bytes.iter().enumerate() {
|
||||
let escape = ESCAPE[byte as usize];
|
||||
if escape == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
writer.extend_from_slice(&bytes[start..i]);
|
||||
|
||||
let char_escape = char_escape_from_escape_table(escape, byte);
|
||||
write_char_escape(writer, char_escape);
|
||||
|
||||
start = i + 1;
|
||||
}
|
||||
|
||||
&bytes[start..]
|
||||
}
|
||||
|
||||
const BB: u8 = b'b'; // \x08
|
||||
const TT: u8 = b't'; // \x09
|
||||
const NN: u8 = b'n'; // \x0A
|
||||
const FF: u8 = b'f'; // \x0C
|
||||
const RR: u8 = b'r'; // \x0D
|
||||
const QU: u8 = b'"'; // \x22
|
||||
const BS: u8 = b'\\'; // \x5C
|
||||
const UU: u8 = b'u'; // \x00...\x1F except the ones above
|
||||
const __: u8 = 0;
|
||||
|
||||
// Lookup table of escape sequences. A value of b'x' at index i means that byte
|
||||
// i is escaped as "\x" in JSON. A value of 0 means that byte i is not escaped.
|
||||
static ESCAPE: [u8; 256] = [
|
||||
// 1 2 3 4 5 6 7 8 9 A B C D E F
|
||||
UU, UU, UU, UU, UU, UU, UU, UU, BB, TT, NN, UU, FF, RR, UU, UU, // 0
|
||||
UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, // 1
|
||||
__, __, QU, __, __, __, __, __, __, __, __, __, __, __, __, __, // 2
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 3
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 4
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, BS, __, __, __, // 5
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 6
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 7
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 8
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 9
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // A
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // B
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // C
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // D
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // E
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // F
|
||||
];
|
||||
|
||||
fn write_char_escape(writer: &mut Vec<u8>, char_escape: CharEscape) {
|
||||
let s = match char_escape {
|
||||
CharEscape::Quote => b"\\\"",
|
||||
CharEscape::ReverseSolidus => b"\\\\",
|
||||
CharEscape::Solidus => b"\\/",
|
||||
CharEscape::Backspace => b"\\b",
|
||||
CharEscape::FormFeed => b"\\f",
|
||||
CharEscape::LineFeed => b"\\n",
|
||||
CharEscape::CarriageReturn => b"\\r",
|
||||
CharEscape::Tab => b"\\t",
|
||||
CharEscape::AsciiControl(byte) => {
|
||||
static HEX_DIGITS: [u8; 16] = *b"0123456789abcdef";
|
||||
let bytes = &[
|
||||
b'\\',
|
||||
b'u',
|
||||
b'0',
|
||||
b'0',
|
||||
HEX_DIGITS[(byte >> 4) as usize],
|
||||
HEX_DIGITS[(byte & 0xF) as usize],
|
||||
];
|
||||
return writer.extend_from_slice(bytes);
|
||||
}
|
||||
};
|
||||
|
||||
writer.extend_from_slice(s);
|
||||
}
|
||||
@@ -67,6 +67,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn copy_bidirectional_client_compute<Client, Compute>(
|
||||
client: &mut Client,
|
||||
compute: &mut Compute,
|
||||
|
||||
@@ -383,9 +383,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session = cancellation_handler_clone.get_key();
|
||||
|
||||
session
|
||||
.write_cancel_key(node.cancel_closure.clone())
|
||||
.await?;
|
||||
session.write_cancel_key(node.cancel_closure.clone())?;
|
||||
|
||||
prepare_client_connection(&node, *session.key(), &mut stream).await?;
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::stream::Stream;
|
||||
use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS};
|
||||
|
||||
/// Forward bytes in both directions (client <-> compute).
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn proxy_pass(
|
||||
client: impl AsyncRead + AsyncWrite + Unpin,
|
||||
compute: impl AsyncRead + AsyncWrite + Unpin,
|
||||
@@ -93,7 +94,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
|
||||
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
|
||||
}
|
||||
|
||||
drop(self.cancel.remove_cancel_key().await); // we don't need a result. If the queue is full, we just log the error
|
||||
drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::fmt::Debug;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use postgres_client::tls::TlsConnect;
|
||||
use postgres_protocol::message::frontend;
|
||||
use postgres_protocol::{authentication::sasl::SCRAM_SHA_256, message::frontend};
|
||||
use tokio::io::{AsyncReadExt, DuplexStream};
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
@@ -60,8 +60,7 @@ async fn proxy_mitm(
|
||||
params: startup.params.into(),
|
||||
},
|
||||
&mut buf,
|
||||
)
|
||||
.unwrap();
|
||||
);
|
||||
end_server.send(buf.freeze()).await.unwrap();
|
||||
|
||||
// proxy messages between end_client and end_server
|
||||
@@ -90,7 +89,7 @@ async fn proxy_mitm(
|
||||
new_message.extend_from_slice(sasl_message.strip_prefix(b"p=tls-server-end-point,,").unwrap());
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::sasl_initial_response("SCRAM-SHA-256", &new_message, &mut buf).unwrap();
|
||||
frontend::sasl_initial_response(SCRAM_SHA_256, &new_message, &mut buf);
|
||||
|
||||
end_server.send(buf.freeze()).await.unwrap();
|
||||
continue;
|
||||
|
||||
@@ -21,8 +21,8 @@ pub(crate) use key::ScramKey;
|
||||
pub(crate) use secret::ServerSecret;
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
|
||||
const SCRAM_SHA_256_PLUS: &str = "SCRAM-SHA-256-PLUS";
|
||||
pub(crate) const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
|
||||
pub(crate) const SCRAM_SHA_256_PLUS: &str = "SCRAM-SHA-256-PLUS";
|
||||
|
||||
/// A list of supported SCRAM methods.
|
||||
pub(crate) const METHODS: &[&str] = &[SCRAM_SHA_256_PLUS, SCRAM_SHA_256];
|
||||
|
||||
@@ -13,22 +13,19 @@ pub(crate) struct Pbkdf2 {
|
||||
// inspired from <https://github.com/neondatabase/rust-postgres/blob/20031d7a9ee1addeae6e0968e3899ae6bf01cee2/postgres-protocol/src/authentication/sasl.rs#L36-L61>
|
||||
impl Pbkdf2 {
|
||||
pub(crate) fn start(str: &[u8], salt: &[u8], iterations: u32) -> Self {
|
||||
let hmac =
|
||||
// key the HMAC and derive the first block in-place
|
||||
let mut hmac =
|
||||
Hmac::<Sha256>::new_from_slice(str).expect("HMAC is able to accept all key sizes");
|
||||
|
||||
let prev = hmac
|
||||
.clone()
|
||||
.chain_update(salt)
|
||||
.chain_update(1u32.to_be_bytes())
|
||||
.finalize()
|
||||
.into_bytes();
|
||||
hmac.update(salt);
|
||||
hmac.update(&1u32.to_be_bytes());
|
||||
let init_block = hmac.finalize_reset().into_bytes();
|
||||
|
||||
Self {
|
||||
hmac,
|
||||
// one consumed for the hash above
|
||||
// one iteration spent above
|
||||
iterations: iterations - 1,
|
||||
hi: prev,
|
||||
prev,
|
||||
hi: init_block,
|
||||
prev: init_block,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,14 +41,17 @@ impl Pbkdf2 {
|
||||
iterations,
|
||||
} = self;
|
||||
|
||||
// only do 4096 iterations per turn before sharing the thread for fairness
|
||||
// only do up to 4096 iterations per turn for fairness
|
||||
let n = (*iterations).clamp(0, 4096);
|
||||
for _ in 0..n {
|
||||
*prev = hmac.clone().chain_update(*prev).finalize().into_bytes();
|
||||
hmac.update(prev);
|
||||
let block = hmac.finalize_reset().into_bytes();
|
||||
|
||||
for (hi, prev) in hi.iter_mut().zip(*prev) {
|
||||
*hi ^= prev;
|
||||
for (hi_byte, &b) in hi.iter_mut().zip(block.iter()) {
|
||||
*hi_byte ^= b;
|
||||
}
|
||||
|
||||
*prev = block;
|
||||
}
|
||||
|
||||
*iterations -= n;
|
||||
|
||||
@@ -103,7 +103,7 @@ class AbstractNeonCli:
|
||||
else:
|
||||
stdout = ""
|
||||
|
||||
log.warn(f"CLI timeout: stderr={stderr}, stdout={stdout}")
|
||||
log.warning(f"CLI timeout: stderr={stderr}, stdout={stdout}")
|
||||
raise
|
||||
|
||||
indent = " "
|
||||
|
||||
@@ -187,6 +187,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
|
||||
},
|
||||
"rel_size_v2_enabled": True,
|
||||
"relsize_snapshot_cache_capacity": 10000,
|
||||
"gc_compaction_enabled": True,
|
||||
"gc_compaction_verification": False,
|
||||
"gc_compaction_initial_threshold_kb": 1024000,
|
||||
|
||||
@@ -19,6 +19,16 @@ TEST_ROLE_NAMES = [
|
||||
{"name": "role$"},
|
||||
{"name": "role$$"},
|
||||
{"name": "role$x$"},
|
||||
{"name": "x"},
|
||||
{"name": "xx"},
|
||||
{"name": "$x"},
|
||||
{"name": "x$"},
|
||||
{"name": "$x$"},
|
||||
{"name": "xx$"},
|
||||
{"name": "$xx"},
|
||||
{"name": "$xx$"},
|
||||
# 63 bytes is the limit for role/DB names in Postgres
|
||||
{"name": "x" * 63},
|
||||
]
|
||||
|
||||
TEST_DB_NAMES = [
|
||||
@@ -74,6 +84,43 @@ TEST_DB_NAMES = [
|
||||
"name": "db name$x$",
|
||||
"owner": "role$x$",
|
||||
},
|
||||
{
|
||||
"name": "x",
|
||||
"owner": "x",
|
||||
},
|
||||
{
|
||||
"name": "xx",
|
||||
"owner": "xx",
|
||||
},
|
||||
{
|
||||
"name": "$x",
|
||||
"owner": "$x",
|
||||
},
|
||||
{
|
||||
"name": "x$",
|
||||
"owner": "x$",
|
||||
},
|
||||
{
|
||||
"name": "$x$",
|
||||
"owner": "$x$",
|
||||
},
|
||||
{
|
||||
"name": "xx$",
|
||||
"owner": "xx$",
|
||||
},
|
||||
{
|
||||
"name": "$xx",
|
||||
"owner": "$xx",
|
||||
},
|
||||
{
|
||||
"name": "$xx$",
|
||||
"owner": "$xx$",
|
||||
},
|
||||
# 63 bytes is the limit for role/DB names in Postgres
|
||||
{
|
||||
"name": "x" * 63,
|
||||
"owner": "x" * 63,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
@@ -146,6 +193,10 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can create and work with databases and roles
|
||||
with special characters (whitespaces, %, tabs, etc.) in the name.
|
||||
Also use `drop_subscriptions_before_start: true`. We do not actually
|
||||
have any subscriptions in this test, so it should be no-op, but it
|
||||
i) simulates the case when we create a second dev branch together with
|
||||
a new project creation, and ii) just generally stresses more code paths.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
@@ -159,6 +210,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"drop_subscriptions_before_start": True,
|
||||
"cluster": {
|
||||
"roles": TEST_ROLE_NAMES,
|
||||
"databases": TEST_DB_NAMES,
|
||||
@@ -202,6 +254,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"drop_subscriptions_before_start": True,
|
||||
"cluster": {
|
||||
"roles": [],
|
||||
"databases": [],
|
||||
|
||||
@@ -510,7 +510,7 @@ def list_elegible_layers(
|
||||
except KeyError:
|
||||
# Unexpected: tests should call this when pageservers are in a quiet state such that the layer map
|
||||
# matches what's on disk.
|
||||
log.warn(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
|
||||
log.warning(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
|
||||
raise
|
||||
|
||||
return list(c for c in candidates if is_visible(c))
|
||||
@@ -636,7 +636,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
except:
|
||||
# On assertion failures, log some details to help with debugging
|
||||
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
log.warn(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
|
||||
log.warning(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
|
||||
raise
|
||||
|
||||
# Scrub the remote storage
|
||||
|
||||
@@ -27,8 +27,9 @@ from contextlib import closing
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin, wait_for_last_flush_lsn, wait_replica_caughtup
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.utils import query_scalar, skip_on_postgres, wait_until
|
||||
|
||||
@@ -695,3 +696,110 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv):
|
||||
with secondary.cursor() as secondary_cur:
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (n_restarts,)
|
||||
|
||||
|
||||
def test_ephemeral_endpoints_vacuum(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
sql = """
|
||||
CREATE TABLE CHAR_TBL(f1 char(4));
|
||||
CREATE TABLE FLOAT8_TBL(f1 float8);
|
||||
CREATE TABLE INT2_TBL(f1 int2);
|
||||
CREATE TABLE INT4_TBL(f1 int4);
|
||||
CREATE TABLE INT8_TBL(q1 int8, q2 int8);
|
||||
CREATE TABLE POINT_TBL(f1 point);
|
||||
CREATE TABLE TEXT_TBL (f1 text);
|
||||
CREATE TABLE VARCHAR_TBL(f1 varchar(4));
|
||||
CREATE TABLE onek (unique1 int4);
|
||||
CREATE TABLE onek2 AS SELECT * FROM onek;
|
||||
CREATE TABLE tenk1 (unique1 int4);
|
||||
CREATE TABLE tenk2 AS SELECT * FROM tenk1;
|
||||
CREATE TABLE person (name text, age int4,location point);
|
||||
CREATE TABLE emp (salary int4, manager name) INHERITS (person);
|
||||
CREATE TABLE student (gpa float8) INHERITS (person);
|
||||
CREATE TABLE stud_emp ( percent int4) INHERITS (emp, student);
|
||||
CREATE TABLE road (name text,thepath path);
|
||||
CREATE TABLE ihighway () INHERITS (road);
|
||||
CREATE TABLE shighway(surface text) INHERITS (road);
|
||||
CREATE TABLE BOOLTBL3 (d text, b bool, o int);
|
||||
CREATE TABLE booltbl4(isfalse bool, istrue bool, isnul bool);
|
||||
DROP TABLE BOOLTBL3;
|
||||
DROP TABLE BOOLTBL4;
|
||||
CREATE TABLE ceil_floor_round (a numeric);
|
||||
DROP TABLE ceil_floor_round;
|
||||
CREATE TABLE width_bucket_test (operand_num numeric, operand_f8 float8);
|
||||
DROP TABLE width_bucket_test;
|
||||
CREATE TABLE num_input_test (n1 numeric);
|
||||
CREATE TABLE num_variance (a numeric);
|
||||
INSERT INTO num_variance VALUES (0);
|
||||
CREATE TABLE snapshot_test (nr integer, snap txid_snapshot);
|
||||
CREATE TABLE guid1(guid_field UUID, text_field TEXT DEFAULT(now()));
|
||||
CREATE TABLE guid2(guid_field UUID, text_field TEXT DEFAULT(now()));
|
||||
CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field);
|
||||
CREATE INDEX guid1_hash ON guid1 USING HASH (guid_field);
|
||||
TRUNCATE guid1;
|
||||
DROP TABLE guid1;
|
||||
DROP TABLE guid2 CASCADE;
|
||||
CREATE TABLE numrange_test (nr NUMRANGE);
|
||||
CREATE INDEX numrange_test_btree on numrange_test(nr);
|
||||
CREATE TABLE numrange_test2(nr numrange);
|
||||
CREATE INDEX numrange_test2_hash_idx on numrange_test2 using hash (nr);
|
||||
INSERT INTO numrange_test2 VALUES('[, 5)');
|
||||
CREATE TABLE textrange_test (tr text);
|
||||
CREATE INDEX textrange_test_btree on textrange_test(tr);
|
||||
CREATE TABLE test_range_gist(ir int4range);
|
||||
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
|
||||
DROP INDEX test_range_gist_idx;
|
||||
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
|
||||
CREATE TABLE test_range_spgist(ir int4range);
|
||||
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
|
||||
DROP INDEX test_range_spgist_idx;
|
||||
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
|
||||
CREATE TABLE test_range_elem(i int4);
|
||||
CREATE INDEX test_range_elem_idx on test_range_elem (i);
|
||||
CREATE INDEX ON test_range_elem using spgist(int4range(i,i+10));
|
||||
DROP TABLE test_range_elem;
|
||||
CREATE TABLE test_range_excl(room int4range, speaker int4range, during tsrange, exclude using gist (room with =, during with &&), exclude using gist (speaker with =, during with &&));
|
||||
CREATE TABLE f_test(f text, i int);
|
||||
CREATE TABLE i8r_array (f1 int, f2 text);
|
||||
CREATE TYPE arrayrange as range (subtype=int4[]);
|
||||
CREATE TYPE two_ints as (a int, b int);
|
||||
DROP TYPE two_ints cascade;
|
||||
CREATE TABLE text_support_test (t text);
|
||||
CREATE TABLE TEMP_FLOAT (f1 FLOAT8);
|
||||
CREATE TABLE TEMP_INT4 (f1 INT4);
|
||||
CREATE TABLE TEMP_INT2 (f1 INT2);
|
||||
CREATE TABLE TEMP_GROUP (f1 INT4, f2 INT4, f3 FLOAT8);
|
||||
CREATE TABLE POLYGON_TBL(f1 polygon);
|
||||
CREATE TABLE quad_poly_tbl (id int, p polygon);
|
||||
INSERT INTO quad_poly_tbl SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10)) FROM generate_series(1, 200) x, generate_series(1, 100) y;
|
||||
CREATE TABLE quad_poly_tbl_ord_seq2 AS SELECT 1 FROM quad_poly_tbl;
|
||||
CREATE TABLE quad_poly_tbl_ord_idx2 AS SELECT 1 FROM quad_poly_tbl;
|
||||
"""
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
env.endpoints.create_start(branch_name="main", lsn=lsn)
|
||||
log.info(f"lsn: {lsn}")
|
||||
|
||||
for line in sql.split("\n"):
|
||||
if len(line.strip()) == 0 or line.startswith("--"):
|
||||
continue
|
||||
cur.execute(line)
|
||||
|
||||
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
env.endpoints.create_start(branch_name="main", lsn=lsn)
|
||||
log.info(f"lsn: {lsn}")
|
||||
|
||||
cur.execute("VACUUM FULL pg_class;")
|
||||
|
||||
for ep in env.endpoints.endpoints:
|
||||
log.info(f"{ep.endpoint_id} / {ep.pg_port}")
|
||||
pg_dump_command = ["pg_dumpall", "-f", f"/tmp/dump-{ep.endpoint_id}.sql"]
|
||||
env_vars = {
|
||||
"PGPORT": str(ep.pg_port),
|
||||
"PGUSER": endpoint.default_options["user"],
|
||||
"PGHOST": endpoint.default_options["host"],
|
||||
}
|
||||
pg_bin.run_capture(pg_dump_command, env=env_vars)
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 4cca6f8083...55c0d45abe
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: daa81cffcf...de7640f55d
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 15710a76b7...0bf96bd6d7
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: e5374b7299...8be779fd3a
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.5",
|
||||
"e5374b72997b0afc8374137674e873f7a558120a"
|
||||
"8be779fd3ab9e87206da96a7e4842ef1abf04f44"
|
||||
],
|
||||
"v16": [
|
||||
"16.9",
|
||||
"15710a76b7d07912110fcbbaf0c8ad6d7e5a9fbc"
|
||||
"0bf96bd6d70301a0b43b0b3457bb3cf8fb43c198"
|
||||
],
|
||||
"v15": [
|
||||
"15.13",
|
||||
"daa81cffcf063c54b29a9aabdb6604625f675ad0"
|
||||
"de7640f55da07512834d5cc40c4b3fb376b5f04f"
|
||||
],
|
||||
"v14": [
|
||||
"14.18",
|
||||
"4cca6f8083483dda9e12eae292cf788d45bd561f"
|
||||
"55c0d45abe6467c02084c2192bca117eda6ce1e7"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user