mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-11 14:40:36 +00:00
Compare commits
1 Commits
ci-run/pr-
...
jcsp/remov
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dea475f1ba |
3
.github/workflows/build_and_test.yml
vendored
3
.github/workflows/build_and_test.yml
vendored
@@ -404,7 +404,7 @@ jobs:
|
||||
uses: ./.github/actions/save-coverage-data
|
||||
|
||||
regress-tests:
|
||||
needs: [ check-permissions, build-neon, tag ]
|
||||
needs: [ check-permissions, build-neon ]
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
@@ -436,7 +436,6 @@ jobs:
|
||||
env:
|
||||
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
|
||||
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
- name: Merge and upload coverage data
|
||||
if: matrix.build_type == 'debug' && matrix.pg_version == 'v14'
|
||||
|
||||
50
Cargo.lock
generated
50
Cargo.lock
generated
@@ -651,7 +651,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"axum-core",
|
||||
"base64 0.21.1",
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http",
|
||||
@@ -848,7 +848,7 @@ version = "0.65.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"lazy_static",
|
||||
@@ -871,12 +871,6 @@ version = "1.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "2.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07"
|
||||
|
||||
[[package]]
|
||||
name = "block-buffer"
|
||||
version = "0.10.4"
|
||||
@@ -1060,7 +1054,7 @@ checksum = "4f423e341edefb78c9caba2d9c7f7687d0e72e89df3ce3394554754393ac3990"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"clap_lex",
|
||||
"strsim",
|
||||
]
|
||||
@@ -1382,7 +1376,7 @@ version = "0.25.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"crossterm_winapi",
|
||||
"libc",
|
||||
"mio",
|
||||
@@ -2242,7 +2236,7 @@ version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"inotify-sys",
|
||||
"libc",
|
||||
]
|
||||
@@ -2253,7 +2247,7 @@ version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fdd168d97690d0b8c412d6b6c10360277f4d7ee495c5d0d5d5fe0854923255cc"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"futures-core",
|
||||
"inotify-sys",
|
||||
"libc",
|
||||
@@ -2373,7 +2367,7 @@ version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
@@ -2587,7 +2581,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
]
|
||||
@@ -2598,7 +2592,7 @@ version = "0.26.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"memoffset 0.7.1",
|
||||
@@ -2622,7 +2616,7 @@ version = "5.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "729f63e1ca555a43fe3efa4f3efdf4801c479da85b432242a7b726f353c88486"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"crossbeam-channel",
|
||||
"filetime",
|
||||
"fsevent-sys",
|
||||
@@ -2743,11 +2737,11 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.60"
|
||||
version = "0.10.55"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800"
|
||||
checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"foreign-types",
|
||||
"libc",
|
||||
@@ -2775,9 +2769,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.96"
|
||||
version = "0.9.90"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f"
|
||||
checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
@@ -3017,7 +3011,6 @@ dependencies = [
|
||||
"serde_with",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -3402,7 +3395,7 @@ version = "0.14.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b1de8dacb0873f77e6aefc6d71e044761fcc68060290f5b1089fcdf84626bb69"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"byteorder",
|
||||
"hex",
|
||||
"lazy_static",
|
||||
@@ -3512,7 +3505,6 @@ dependencies = [
|
||||
"pbkdf2",
|
||||
"pin-project-lite",
|
||||
"postgres-native-tls",
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"pq_proto",
|
||||
"prometheus",
|
||||
@@ -3684,7 +3676,7 @@ version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3693,7 +3685,7 @@ version = "0.3.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3987,7 +3979,7 @@ version = "0.36.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6da3636faa25820d8648e0e31c5d519bbb01f72fdf57131f0f5f7da5fed36eab"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"errno",
|
||||
"io-lifetimes",
|
||||
"libc",
|
||||
@@ -4001,7 +3993,7 @@ version = "0.37.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d4eb579851244c2c03e7c24f501c3432bed80b8f720af1d6e5b0e0f01555a035"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"errno",
|
||||
"io-lifetimes",
|
||||
"libc",
|
||||
@@ -4214,7 +4206,7 @@ version = "2.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1fc758eb7bffce5b308734e9b0c1468893cae9ff70ebf13e7090be8dcbcc83a8"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags",
|
||||
"core-foundation",
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
|
||||
@@ -714,24 +714,6 @@ RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.3.tar.gz -
|
||||
cargo pgrx install --release && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "wal2json-build"
|
||||
# Compile "wal2json" extension
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM build-deps AS wal2json-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/eulerto/wal2json/archive/refs/tags/wal2json_2_5.tar.gz && \
|
||||
echo "b516653575541cf221b99cf3f8be9b6821f6dbcfc125675c85f35090f824f00e wal2json_2_5.tar.gz" | sha256sum --check && \
|
||||
mkdir wal2json-src && cd wal2json-src && tar xvzf ../wal2json_2_5.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/wal2json.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "neon-pg-ext-build"
|
||||
@@ -768,7 +750,6 @@ COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=wal2json-pg-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
@@ -687,9 +687,6 @@ pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
|
||||
info!("create neon extension with query: {}", query);
|
||||
client.simple_query(query)?;
|
||||
|
||||
query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
|
||||
client.simple_query(query)?;
|
||||
|
||||
query = "ALTER EXTENSION neon SET SCHEMA neon";
|
||||
info!("alter neon extension schema with query: {}", query);
|
||||
client.simple_query(query)?;
|
||||
|
||||
@@ -14,6 +14,7 @@ use pageserver_api::models::{
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use utils::{
|
||||
generation::Generation,
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
@@ -92,22 +93,6 @@ pub fn migrate_tenant(
|
||||
// Get a new generation
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
|
||||
fn build_location_config(
|
||||
mode: LocationConfigMode,
|
||||
generation: Option<u32>,
|
||||
secondary_conf: Option<LocationConfigSecondary>,
|
||||
) -> LocationConfig {
|
||||
LocationConfig {
|
||||
mode,
|
||||
generation,
|
||||
secondary_conf,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
shard_number: 0,
|
||||
shard_count: 0,
|
||||
shard_stripe_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
let previous = attachment_service.inspect(tenant_id)?;
|
||||
let mut baseline_lsns = None;
|
||||
if let Some((generation, origin_ps_id)) = &previous {
|
||||
@@ -116,7 +101,12 @@ pub fn migrate_tenant(
|
||||
if origin_ps_id == &dest_ps.conf.id {
|
||||
println!("🔁 Already attached to {origin_ps_id}, freshening...");
|
||||
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
|
||||
let dest_conf = LocationConfig {
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
generation: gen.map(Generation::new),
|
||||
secondary_conf: None,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
};
|
||||
dest_ps.location_config(tenant_id, dest_conf)?;
|
||||
println!("✅ Migration complete");
|
||||
return Ok(());
|
||||
@@ -124,15 +114,24 @@ pub fn migrate_tenant(
|
||||
|
||||
println!("🔁 Switching origin pageserver {origin_ps_id} to stale mode");
|
||||
|
||||
let stale_conf =
|
||||
build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None);
|
||||
let stale_conf = LocationConfig {
|
||||
mode: LocationConfigMode::AttachedStale,
|
||||
generation: Some(Generation::new(*generation)),
|
||||
secondary_conf: None,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
};
|
||||
origin_ps.location_config(tenant_id, stale_conf)?;
|
||||
|
||||
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?);
|
||||
}
|
||||
|
||||
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None);
|
||||
let dest_conf = LocationConfig {
|
||||
mode: LocationConfigMode::AttachedMulti,
|
||||
generation: gen.map(Generation::new),
|
||||
secondary_conf: None,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
};
|
||||
|
||||
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
|
||||
dest_ps.location_config(tenant_id, dest_conf)?;
|
||||
@@ -171,11 +170,12 @@ pub fn migrate_tenant(
|
||||
}
|
||||
|
||||
// Downgrade to a secondary location
|
||||
let secondary_conf = build_location_config(
|
||||
LocationConfigMode::Secondary,
|
||||
None,
|
||||
Some(LocationConfigSecondary { warm: true }),
|
||||
);
|
||||
let secondary_conf = LocationConfig {
|
||||
mode: LocationConfigMode::Secondary,
|
||||
generation: None,
|
||||
secondary_conf: Some(LocationConfigSecondary { warm: true }),
|
||||
tenant_conf: TenantConfig::default(),
|
||||
};
|
||||
|
||||
println!(
|
||||
"💤 Switching to secondary mode on pageserver {}",
|
||||
@@ -188,7 +188,12 @@ pub fn migrate_tenant(
|
||||
"🔁 Switching to AttachedSingle mode on pageserver {}",
|
||||
dest_ps.conf.id
|
||||
);
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
|
||||
let dest_conf = LocationConfig {
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
generation: gen.map(Generation::new),
|
||||
secondary_conf: None,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
};
|
||||
dest_ps.location_config(tenant_id, dest_conf)?;
|
||||
|
||||
println!("✅ Migration complete");
|
||||
|
||||
@@ -18,7 +18,6 @@ enum-map.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
hex.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ use serde_with::serde_as;
|
||||
use strum_macros;
|
||||
use utils::{
|
||||
completion,
|
||||
generation::Generation,
|
||||
history_buffer::HistoryBufferWithDropCounter,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -261,19 +262,10 @@ pub struct LocationConfig {
|
||||
pub mode: LocationConfigMode,
|
||||
/// If attaching, in what generation?
|
||||
#[serde(default)]
|
||||
pub generation: Option<u32>,
|
||||
pub generation: Option<Generation>,
|
||||
#[serde(default)]
|
||||
pub secondary_conf: Option<LocationConfigSecondary>,
|
||||
|
||||
// Shard parameters: if shard_count is nonzero, then other shard_* fields
|
||||
// must be set accurately.
|
||||
#[serde(default)]
|
||||
pub shard_number: u8,
|
||||
#[serde(default)]
|
||||
pub shard_count: u8,
|
||||
#[serde(default)]
|
||||
pub shard_stripe_size: u32,
|
||||
|
||||
// If requesting mode `Secondary`, configuration for that.
|
||||
// Custom storage configuration for the tenant, if any
|
||||
pub tenant_conf: TenantConfig,
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::{ops::RangeInclusive, str::FromStr};
|
||||
|
||||
use hex::FromHex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror;
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
|
||||
@@ -140,89 +139,6 @@ impl From<[u8; 18]> for TenantShardId {
|
||||
}
|
||||
}
|
||||
|
||||
/// For use within the context of a particular tenant, when we need to know which
|
||||
/// shard we're dealing with, but do not need to know the full ShardIdentity (because
|
||||
/// we won't be doing any page->shard mapping), and do not need to know the fully qualified
|
||||
/// TenantShardId.
|
||||
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
|
||||
pub struct ShardIndex {
|
||||
pub shard_number: ShardNumber,
|
||||
pub shard_count: ShardCount,
|
||||
}
|
||||
|
||||
impl ShardIndex {
|
||||
pub fn new(number: ShardNumber, count: ShardCount) -> Self {
|
||||
Self {
|
||||
shard_number: number,
|
||||
shard_count: count,
|
||||
}
|
||||
}
|
||||
pub fn unsharded() -> Self {
|
||||
Self {
|
||||
shard_number: ShardNumber(0),
|
||||
shard_count: ShardCount(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
|
||||
}
|
||||
|
||||
/// For use in constructing remote storage paths: concatenate this with a TenantId
|
||||
/// to get a fully qualified TenantShardId.
|
||||
///
|
||||
/// Backward compat: this function returns an empty string if Self::is_unsharded, such
|
||||
/// that the legacy pre-sharding remote key format is preserved.
|
||||
pub fn get_suffix(&self) -> String {
|
||||
if self.is_unsharded() {
|
||||
"".to_string()
|
||||
} else {
|
||||
format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ShardIndex {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ShardIndex {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// Debug is the same as Display: the compact hex representation
|
||||
write!(f, "{}", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::str::FromStr for ShardIndex {
|
||||
type Err = hex::FromHexError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
// Expect format: 1 byte shard number, 1 byte shard count
|
||||
if s.len() == 4 {
|
||||
let bytes = s.as_bytes();
|
||||
let mut shard_parts: [u8; 2] = [0u8; 2];
|
||||
hex::decode_to_slice(bytes, &mut shard_parts)?;
|
||||
Ok(Self {
|
||||
shard_number: ShardNumber(shard_parts[0]),
|
||||
shard_count: ShardCount(shard_parts[1]),
|
||||
})
|
||||
} else {
|
||||
Err(hex::FromHexError::InvalidStringLength)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<[u8; 2]> for ShardIndex {
|
||||
fn from(b: [u8; 2]) -> Self {
|
||||
Self {
|
||||
shard_number: ShardNumber(b[0]),
|
||||
shard_count: ShardCount(b[1]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for TenantShardId {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
@@ -293,151 +209,6 @@ impl<'de> Deserialize<'de> for TenantShardId {
|
||||
}
|
||||
}
|
||||
|
||||
/// Stripe size in number of pages
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
||||
pub struct ShardStripeSize(pub u32);
|
||||
|
||||
/// Layout version: for future upgrades where we might change how the key->shard mapping works
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
||||
pub struct ShardLayout(u8);
|
||||
|
||||
const LAYOUT_V1: ShardLayout = ShardLayout(1);
|
||||
|
||||
/// Default stripe size in pages: 256MiB divided by 8kiB page size.
|
||||
const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
|
||||
|
||||
/// The ShardIdentity contains the information needed for one member of map
|
||||
/// to resolve a key to a shard, and then check whether that shard is ==self.
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
||||
pub struct ShardIdentity {
|
||||
pub layout: ShardLayout,
|
||||
pub number: ShardNumber,
|
||||
pub count: ShardCount,
|
||||
pub stripe_size: ShardStripeSize,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
|
||||
pub enum ShardConfigError {
|
||||
#[error("Invalid shard count")]
|
||||
InvalidCount,
|
||||
#[error("Invalid shard number")]
|
||||
InvalidNumber,
|
||||
#[error("Invalid stripe size")]
|
||||
InvalidStripeSize,
|
||||
}
|
||||
|
||||
impl ShardIdentity {
|
||||
/// An identity with number=0 count=0 is a "none" identity, which represents legacy
|
||||
/// tenants. Modern single-shard tenants should not use this: they should
|
||||
/// have number=0 count=1.
|
||||
pub fn unsharded() -> Self {
|
||||
Self {
|
||||
number: ShardNumber(0),
|
||||
count: ShardCount(0),
|
||||
layout: LAYOUT_V1,
|
||||
stripe_size: DEFAULT_STRIPE_SIZE,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.number == ShardNumber(0) && self.count == ShardCount(0)
|
||||
}
|
||||
|
||||
/// Count must be nonzero, and number must be < count. To construct
|
||||
/// the legacy case (count==0), use Self::unsharded instead.
|
||||
pub fn new(
|
||||
number: ShardNumber,
|
||||
count: ShardCount,
|
||||
stripe_size: ShardStripeSize,
|
||||
) -> Result<Self, ShardConfigError> {
|
||||
if count.0 == 0 {
|
||||
Err(ShardConfigError::InvalidCount)
|
||||
} else if number.0 > count.0 - 1 {
|
||||
Err(ShardConfigError::InvalidNumber)
|
||||
} else if stripe_size.0 == 0 {
|
||||
Err(ShardConfigError::InvalidStripeSize)
|
||||
} else {
|
||||
Ok(Self {
|
||||
number,
|
||||
count,
|
||||
layout: LAYOUT_V1,
|
||||
stripe_size,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for ShardIndex {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
if serializer.is_human_readable() {
|
||||
serializer.collect_str(self)
|
||||
} else {
|
||||
// Binary encoding is not used in index_part.json, but is included in anticipation of
|
||||
// switching various structures (e.g. inter-process communication, remote metadata) to more
|
||||
// compact binary encodings in future.
|
||||
let mut packed: [u8; 2] = [0; 2];
|
||||
packed[0] = self.shard_number.0;
|
||||
packed[1] = self.shard_count.0;
|
||||
packed.serialize(serializer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for ShardIndex {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
struct IdVisitor {
|
||||
is_human_readable_deserializer: bool,
|
||||
}
|
||||
|
||||
impl<'de> serde::de::Visitor<'de> for IdVisitor {
|
||||
type Value = ShardIndex;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
if self.is_human_readable_deserializer {
|
||||
formatter.write_str("value in form of hex string")
|
||||
} else {
|
||||
formatter.write_str("value in form of integer array([u8; 2])")
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: serde::de::SeqAccess<'de>,
|
||||
{
|
||||
let s = serde::de::value::SeqAccessDeserializer::new(seq);
|
||||
let id: [u8; 2] = Deserialize::deserialize(s)?;
|
||||
Ok(ShardIndex::from(id))
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
ShardIndex::from_str(v).map_err(E::custom)
|
||||
}
|
||||
}
|
||||
|
||||
if deserializer.is_human_readable() {
|
||||
deserializer.deserialize_str(IdVisitor {
|
||||
is_human_readable_deserializer: true,
|
||||
})
|
||||
} else {
|
||||
deserializer.deserialize_tuple(
|
||||
2,
|
||||
IdVisitor {
|
||||
is_human_readable_deserializer: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
@@ -547,66 +318,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_identity_validation() -> Result<(), ShardConfigError> {
|
||||
// Happy cases
|
||||
ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
|
||||
ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
|
||||
ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
|
||||
|
||||
assert_eq!(
|
||||
ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
|
||||
Err(ShardConfigError::InvalidCount)
|
||||
);
|
||||
assert_eq!(
|
||||
ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
|
||||
Err(ShardConfigError::InvalidNumber)
|
||||
);
|
||||
assert_eq!(
|
||||
ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
|
||||
Err(ShardConfigError::InvalidNumber)
|
||||
);
|
||||
assert_eq!(
|
||||
ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
|
||||
Err(ShardConfigError::InvalidNumber)
|
||||
);
|
||||
assert_eq!(
|
||||
ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
|
||||
Err(ShardConfigError::InvalidStripeSize)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
|
||||
let example = ShardIndex {
|
||||
shard_number: ShardNumber(13),
|
||||
shard_count: ShardCount(17),
|
||||
};
|
||||
let expected: String = "0d11".to_string();
|
||||
let encoded = format!("{example}");
|
||||
assert_eq!(&encoded, &expected);
|
||||
|
||||
let decoded = ShardIndex::from_str(&encoded)?;
|
||||
assert_eq!(example, decoded);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
|
||||
let example = ShardIndex {
|
||||
shard_number: ShardNumber(13),
|
||||
shard_count: ShardCount(17),
|
||||
};
|
||||
let expected: [u8; 2] = [0x0d, 0x11];
|
||||
|
||||
let encoded = bincode::serialize(&example).unwrap();
|
||||
assert_eq!(Hex(&encoded), Hex(&expected));
|
||||
let decoded = bincode::deserialize(&encoded).unwrap();
|
||||
assert_eq!(example, decoded);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ use crate::control_plane_client::ControlPlaneGenerationsApi;
|
||||
use crate::metrics;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use crate::tenant::remote_timeline_client::remote_timeline_path;
|
||||
use crate::tenant::remote_timeline_client::LayerFileMetadata;
|
||||
use crate::virtual_file::MaybeFatalIo;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use anyhow::Context;
|
||||
@@ -510,19 +509,18 @@ impl DeletionQueueClient {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
layers: Vec<(LayerFileName, Generation)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
if current_generation.is_none() {
|
||||
debug!("Enqueuing deletions in legacy mode, skipping queue");
|
||||
|
||||
let mut layer_paths = Vec::new();
|
||||
for (layer, meta) in layers {
|
||||
for (layer, generation) in layers {
|
||||
layer_paths.push(remote_layer_path(
|
||||
&tenant_id,
|
||||
&timeline_id,
|
||||
meta.shard,
|
||||
&layer,
|
||||
meta.generation,
|
||||
generation,
|
||||
));
|
||||
}
|
||||
self.push_immediate(layer_paths).await?;
|
||||
@@ -542,7 +540,7 @@ impl DeletionQueueClient {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
layers: Vec<(LayerFileName, Generation)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
@@ -753,7 +751,6 @@ impl DeletionQueue {
|
||||
mod test {
|
||||
use camino::Utf8Path;
|
||||
use hex_literal::hex;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::{io::ErrorKind, time::Duration};
|
||||
use tracing::info;
|
||||
|
||||
@@ -993,8 +990,6 @@ mod test {
|
||||
// we delete, and the generation of the running Tenant.
|
||||
let layer_generation = Generation::new(0xdeadbeef);
|
||||
let now_generation = Generation::new(0xfeedbeef);
|
||||
let layer_metadata =
|
||||
LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
|
||||
|
||||
let remote_layer_file_name_1 =
|
||||
format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
|
||||
@@ -1018,7 +1013,7 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
|
||||
[(layer_file_name_1.clone(), layer_generation)].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
|
||||
@@ -1057,8 +1052,6 @@ mod test {
|
||||
let stale_generation = latest_generation.previous();
|
||||
// Generation that our example layer file was written with
|
||||
let layer_generation = stale_generation.previous();
|
||||
let layer_metadata =
|
||||
LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
|
||||
|
||||
ctx.set_latest_generation(latest_generation);
|
||||
|
||||
@@ -1076,7 +1069,7 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
stale_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1091,7 +1084,7 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
latest_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1118,8 +1111,6 @@ mod test {
|
||||
|
||||
let layer_generation = Generation::new(0xdeadbeef);
|
||||
let now_generation = Generation::new(0xfeedbeef);
|
||||
let layer_metadata =
|
||||
LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
|
||||
|
||||
// Inject a deletion in the generation before generation_now: after restart,
|
||||
// this deletion should _not_ get executed (only the immediately previous
|
||||
@@ -1131,7 +1122,7 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
now_generation.previous(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1145,7 +1136,7 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
|
||||
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_generation)].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1235,13 +1226,12 @@ pub(crate) mod mock {
|
||||
match msg {
|
||||
ListWriterQueueMessage::Delete(op) => {
|
||||
let mut objects = op.objects;
|
||||
for (layer, meta) in op.layers {
|
||||
for (layer, generation) in op.layers {
|
||||
objects.push(remote_layer_path(
|
||||
&op.tenant_id,
|
||||
&op.timeline_id,
|
||||
meta.shard,
|
||||
&layer,
|
||||
meta.generation,
|
||||
generation,
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,6 @@ use crate::config::PageServerConf;
|
||||
use crate::deletion_queue::TEMP_SUFFIX;
|
||||
use crate::metrics;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use crate::tenant::remote_timeline_client::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::virtual_file::on_fatal_io_error;
|
||||
use crate::virtual_file::MaybeFatalIo;
|
||||
@@ -59,7 +58,7 @@ pub(super) struct DeletionOp {
|
||||
// `layers` and `objects` are both just lists of objects. `layers` is used if you do not
|
||||
// have a config object handy to project it to a remote key, and need the consuming worker
|
||||
// to do it for you.
|
||||
pub(super) layers: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
pub(super) layers: Vec<(LayerFileName, Generation)>,
|
||||
pub(super) objects: Vec<RemotePath>,
|
||||
|
||||
/// The _current_ generation of the Tenant attachment in which we are enqueuing
|
||||
@@ -388,13 +387,12 @@ impl ListWriter {
|
||||
);
|
||||
|
||||
let mut layer_paths = Vec::new();
|
||||
for (layer, meta) in op.layers {
|
||||
for (layer, generation) in op.layers {
|
||||
layer_paths.push(remote_layer_path(
|
||||
&op.tenant_id,
|
||||
&op.timeline_id,
|
||||
meta.shard,
|
||||
&layer,
|
||||
meta.generation,
|
||||
generation,
|
||||
));
|
||||
}
|
||||
layer_paths.extend(op.objects);
|
||||
|
||||
@@ -178,14 +178,7 @@ where
|
||||
.unwrap_or(false);
|
||||
|
||||
if valid && *validated_generation == tenant_lsn_state.generation {
|
||||
for (timeline_id, pending_lsn) in tenant_lsn_state.timelines {
|
||||
tracing::debug!(
|
||||
%tenant_id,
|
||||
%timeline_id,
|
||||
current = %pending_lsn.result_slot.load(),
|
||||
projected = %pending_lsn.projected,
|
||||
"advancing validated remote_consistent_lsn",
|
||||
);
|
||||
for (_timeline_id, pending_lsn) in tenant_lsn_state.timelines {
|
||||
pending_lsn.result_slot.store(pending_lsn.projected);
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -280,7 +280,6 @@ impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
|
||||
use crate::tenant::delete::DeleteTenantError::*;
|
||||
match value {
|
||||
Get(g) => ApiError::from(g),
|
||||
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
|
||||
Timeline(t) => ApiError::from(t),
|
||||
NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
|
||||
SlotError(e) => e.into(),
|
||||
|
||||
@@ -138,14 +138,6 @@ pub struct GcResult {
|
||||
|
||||
#[serde(serialize_with = "serialize_duration_as_millis")]
|
||||
pub elapsed: Duration,
|
||||
|
||||
/// The layers which were garbage collected.
|
||||
///
|
||||
/// Used in `/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc` to wait for the layers to be
|
||||
/// dropped in tests.
|
||||
#[cfg(feature = "testing")]
|
||||
#[serde(skip)]
|
||||
pub(crate) doomed_layers: Vec<crate::tenant::storage_layer::Layer>,
|
||||
}
|
||||
|
||||
// helper function for `GcResult`, serializing a `Duration` as an integer number of milliseconds
|
||||
@@ -166,11 +158,5 @@ impl AddAssign for GcResult {
|
||||
self.layers_removed += other.layers_removed;
|
||||
|
||||
self.elapsed += other.elapsed;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
let mut other = other;
|
||||
self.doomed_layers.append(&mut other.doomed_layers);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,6 @@ use futures::FutureExt;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use remote_storage::DownloadError;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use std::fmt;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::watch;
|
||||
@@ -32,6 +31,26 @@ use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::fs_ext;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use std::cmp::min;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::ops::Bound::Included;
|
||||
use std::process::Command;
|
||||
use std::process::Stdio;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::MutexGuard;
|
||||
use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::AttachedLocationConfig;
|
||||
use self::config::AttachmentMode;
|
||||
use self::config::LocationConf;
|
||||
@@ -65,35 +84,14 @@ use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
|
||||
use crate::tenant::storage_layer::DeltaLayer;
|
||||
use crate::tenant::storage_layer::ImageLayer;
|
||||
use crate::InitializationOrder;
|
||||
use std::cmp::min;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::ops::Bound::Included;
|
||||
use std::process::Stdio;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::MutexGuard;
|
||||
use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::tenant::timeline::delete::DeleteTimelineFlow;
|
||||
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use once_cell::sync::Lazy;
|
||||
pub use pageserver_api::models::TenantState;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
|
||||
use toml_edit;
|
||||
use utils::{
|
||||
crashsafe,
|
||||
@@ -259,8 +257,6 @@ pub struct Tenant {
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
|
||||
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
|
||||
|
||||
// Cancellation token fires when we have entered shutdown(). This is a parent of
|
||||
// Timelines' cancellation token.
|
||||
pub(crate) cancel: CancellationToken,
|
||||
@@ -405,36 +401,6 @@ pub enum CreateTimelineError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum InitdbError {
|
||||
Other(anyhow::Error),
|
||||
Cancelled,
|
||||
Spawn(std::io::Result<()>),
|
||||
Failed(std::process::ExitStatus, Vec<u8>),
|
||||
}
|
||||
|
||||
impl fmt::Display for InitdbError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
InitdbError::Cancelled => write!(f, "Operation was cancelled"),
|
||||
InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e),
|
||||
InitdbError::Failed(status, stderr) => write!(
|
||||
f,
|
||||
"Command failed with status {:?}: {}",
|
||||
status,
|
||||
String::from_utf8_lossy(stderr)
|
||||
),
|
||||
InitdbError::Other(e) => write!(f, "Error: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for InitdbError {
|
||||
fn from(error: std::io::Error) -> Self {
|
||||
InitdbError::Spawn(Err(error))
|
||||
}
|
||||
}
|
||||
|
||||
struct TenantDirectoryScan {
|
||||
sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
|
||||
timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
|
||||
@@ -666,9 +632,9 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
info!("pending_deletion {}", pending_deletion.is_some());
|
||||
info!("pending_deletion {}", pending_deletion);
|
||||
|
||||
if let Some(deletion) = pending_deletion {
|
||||
if pending_deletion {
|
||||
// as we are no longer loading, signal completion by dropping
|
||||
// the completion while we resume deletion
|
||||
drop(_completion);
|
||||
@@ -685,7 +651,6 @@ impl Tenant {
|
||||
}
|
||||
|
||||
match DeleteTenantFlow::resume_from_attach(
|
||||
deletion,
|
||||
&tenant_clone,
|
||||
preload,
|
||||
tenants,
|
||||
@@ -2404,7 +2369,6 @@ impl Tenant {
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::new(format!("Tenant<{tenant_id}>")),
|
||||
}
|
||||
@@ -2638,12 +2602,14 @@ impl Tenant {
|
||||
|
||||
// Perform GC for each timeline.
|
||||
//
|
||||
// Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the
|
||||
// branch creation task, which requires the GC lock. A GC iteration can run concurrently
|
||||
// with branch creation.
|
||||
// Note that we don't hold the GC lock here because we don't want
|
||||
// to delay the branch creation task, which requires the GC lock.
|
||||
// A timeline GC iteration can be slow because it may need to wait for
|
||||
// compaction (both require `layer_removal_cs` lock),
|
||||
// but the GC iteration can run concurrently with branch creation.
|
||||
//
|
||||
// See comments in [`Tenant::branch_timeline`] for more information about why branch
|
||||
// creation task can run concurrently with timeline's GC iteration.
|
||||
// See comments in [`Tenant::branch_timeline`] for more information
|
||||
// about why branch creation task can run concurrently with timeline's GC iteration.
|
||||
for timeline in gc_timelines {
|
||||
if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
|
||||
// We were requested to shut down. Stop and return with the progress we
|
||||
@@ -2926,7 +2892,7 @@ impl Tenant {
|
||||
/// - after initialization completes, tar up the temp dir and upload it to S3.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
pub(crate) async fn bootstrap_timeline(
|
||||
async fn bootstrap_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
@@ -2938,7 +2904,7 @@ impl Tenant {
|
||||
};
|
||||
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
|
||||
// temporary directory for basebackup files for the given timeline.
|
||||
let pgdata_path = path_with_suffix_extension(
|
||||
let initdb_path = path_with_suffix_extension(
|
||||
self.conf
|
||||
.timelines_path(&self.tenant_id)
|
||||
.join(format!("basebackup-{timeline_id}")),
|
||||
@@ -2947,25 +2913,26 @@ impl Tenant {
|
||||
|
||||
// an uninit mark was placed before, nothing else can access this timeline files
|
||||
// current initdb was not run yet, so remove whatever was left from the previous runs
|
||||
if pgdata_path.exists() {
|
||||
fs::remove_dir_all(&pgdata_path).with_context(|| {
|
||||
format!("Failed to remove already existing initdb directory: {pgdata_path}")
|
||||
if initdb_path.exists() {
|
||||
fs::remove_dir_all(&initdb_path).with_context(|| {
|
||||
format!("Failed to remove already existing initdb directory: {initdb_path}")
|
||||
})?;
|
||||
}
|
||||
// Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
|
||||
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
|
||||
run_initdb(self.conf, &initdb_path, pg_version)?;
|
||||
// this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
|
||||
scopeguard::defer! {
|
||||
if let Err(e) = fs::remove_dir_all(&pgdata_path) {
|
||||
if let Err(e) = fs::remove_dir_all(&initdb_path) {
|
||||
// this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
|
||||
error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
|
||||
error!("Failed to remove temporary initdb directory '{initdb_path}': {e}");
|
||||
}
|
||||
}
|
||||
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
|
||||
let pgdata_path = &initdb_path;
|
||||
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(pgdata_path)?.align();
|
||||
|
||||
// Upload the created data dir to S3
|
||||
if let Some(storage) = &self.remote_storage {
|
||||
let pgdata_zstd = import_datadir::create_tar_zst(&pgdata_path).await?;
|
||||
let pgdata_zstd = import_datadir::create_tar_zst(pgdata_path).await?;
|
||||
let pgdata_zstd = Bytes::from(pgdata_zstd);
|
||||
backoff::retry(
|
||||
|| async {
|
||||
@@ -3015,7 +2982,7 @@ impl Tenant {
|
||||
|
||||
import_datadir::import_timeline_from_postgres_datadir(
|
||||
unfinished_timeline,
|
||||
&pgdata_path,
|
||||
pgdata_path,
|
||||
pgdata_lsn,
|
||||
ctx,
|
||||
)
|
||||
@@ -3417,54 +3384,42 @@ fn rebase_directory(
|
||||
|
||||
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
|
||||
/// to get bootstrap data for timeline initialization.
|
||||
async fn run_initdb(
|
||||
fn run_initdb(
|
||||
conf: &'static PageServerConf,
|
||||
initdb_target_dir: &Utf8Path,
|
||||
pg_version: u32,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), InitdbError> {
|
||||
let initdb_bin_path = conf
|
||||
.pg_bin_dir(pg_version)
|
||||
.map_err(InitdbError::Other)?
|
||||
.join("initdb");
|
||||
let initdb_lib_dir = conf.pg_lib_dir(pg_version).map_err(InitdbError::Other)?;
|
||||
) -> anyhow::Result<()> {
|
||||
let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
|
||||
let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
|
||||
info!(
|
||||
"running {} in {}, libdir: {}",
|
||||
initdb_bin_path, initdb_target_dir, initdb_lib_dir,
|
||||
);
|
||||
|
||||
let _permit = INIT_DB_SEMAPHORE.acquire().await;
|
||||
|
||||
let initdb_command = tokio::process::Command::new(&initdb_bin_path)
|
||||
let initdb_output = Command::new(&initdb_bin_path)
|
||||
.args(["-D", initdb_target_dir.as_ref()])
|
||||
.args(["-U", &conf.superuser])
|
||||
.args(["-E", "utf8"])
|
||||
.arg("--no-instructions")
|
||||
// This is only used for a temporary installation that is deleted shortly after,
|
||||
// so no need to fsync it
|
||||
.arg("--no-sync")
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &initdb_lib_dir)
|
||||
.env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
// If the `select!` below doesn't finish the `wait_with_output`,
|
||||
// let the task get `wait()`ed for asynchronously by tokio.
|
||||
// This means there is a slim chance we can go over the INIT_DB_SEMAPHORE.
|
||||
// TODO: fix for this is non-trivial, see
|
||||
// https://github.com/neondatabase/neon/pull/5921#pullrequestreview-1750858021
|
||||
//
|
||||
.kill_on_drop(true)
|
||||
.spawn()?;
|
||||
|
||||
tokio::select! {
|
||||
initdb_output = initdb_command.wait_with_output() => {
|
||||
let initdb_output = initdb_output?;
|
||||
if !initdb_output.status.success() {
|
||||
return Err(InitdbError::Failed(initdb_output.status, initdb_output.stderr));
|
||||
}
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
return Err(InitdbError::Cancelled);
|
||||
}
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to execute {} at target dir {}",
|
||||
initdb_bin_path, initdb_target_dir,
|
||||
)
|
||||
})?;
|
||||
if !initdb_output.status.success() {
|
||||
bail!(
|
||||
"initdb failed: '{}'",
|
||||
String::from_utf8_lossy(&initdb_output.stderr)
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -3510,7 +3465,6 @@ pub async fn dump_layerfile_from_path(
|
||||
pub(crate) mod harness {
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
use utils::logging;
|
||||
@@ -3577,7 +3531,6 @@ pub(crate) mod harness {
|
||||
pub tenant_conf: TenantConf,
|
||||
pub tenant_id: TenantId,
|
||||
pub generation: Generation,
|
||||
pub shard: ShardIndex,
|
||||
pub remote_storage: GenericRemoteStorage,
|
||||
pub remote_fs_dir: Utf8PathBuf,
|
||||
pub deletion_queue: MockDeletionQueue,
|
||||
@@ -3637,7 +3590,6 @@ pub(crate) mod harness {
|
||||
tenant_conf,
|
||||
tenant_id,
|
||||
generation: Generation::new(0xdeadbeef),
|
||||
shard: ShardIndex::unsharded(),
|
||||
remote_storage,
|
||||
remote_fs_dir,
|
||||
deletion_queue,
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
//!
|
||||
use anyhow::Context;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::num::NonZeroU64;
|
||||
use std::time::Duration;
|
||||
@@ -89,14 +88,6 @@ pub(crate) struct LocationConf {
|
||||
/// The location-specific part of the configuration, describes the operating
|
||||
/// mode of this pageserver for this tenant.
|
||||
pub(crate) mode: LocationMode,
|
||||
|
||||
/// The detailed shard identity. This structure is already scoped within
|
||||
/// a TenantShardId, but we need the full ShardIdentity to enable calculating
|
||||
/// key->shard mappings.
|
||||
#[serde(default = "ShardIdentity::unsharded")]
|
||||
#[serde(skip_serializing_if = "ShardIdentity::is_unsharded")]
|
||||
pub(crate) shard: ShardIdentity,
|
||||
|
||||
/// The pan-cluster tenant configuration, the same on all locations
|
||||
pub(crate) tenant_conf: TenantConfOpt,
|
||||
}
|
||||
@@ -169,8 +160,6 @@ impl LocationConf {
|
||||
generation,
|
||||
attach_mode: AttachmentMode::Single,
|
||||
}),
|
||||
// Legacy configuration loads are always from tenants created before sharding existed.
|
||||
shard: ShardIdentity::unsharded(),
|
||||
tenant_conf,
|
||||
}
|
||||
}
|
||||
@@ -198,7 +187,6 @@ impl LocationConf {
|
||||
|
||||
fn get_generation(conf: &'_ models::LocationConfig) -> Result<Generation, anyhow::Error> {
|
||||
conf.generation
|
||||
.map(Generation::new)
|
||||
.ok_or_else(|| anyhow::anyhow!("Generation must be set when attaching"))
|
||||
}
|
||||
|
||||
@@ -238,21 +226,7 @@ impl LocationConf {
|
||||
}
|
||||
};
|
||||
|
||||
let shard = if conf.shard_count == 0 {
|
||||
ShardIdentity::unsharded()
|
||||
} else {
|
||||
ShardIdentity::new(
|
||||
ShardNumber(conf.shard_number),
|
||||
ShardCount(conf.shard_count),
|
||||
ShardStripeSize(conf.shard_stripe_size),
|
||||
)?
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
shard,
|
||||
mode,
|
||||
tenant_conf,
|
||||
})
|
||||
Ok(Self { mode, tenant_conf })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -267,7 +241,6 @@ impl Default for LocationConf {
|
||||
attach_mode: AttachmentMode::Single,
|
||||
}),
|
||||
tenant_conf: TenantConfOpt::default(),
|
||||
shard: ShardIdentity::unsharded(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, instrument, warn, Instrument, Span};
|
||||
|
||||
@@ -39,9 +38,6 @@ pub(crate) enum DeleteTenantError {
|
||||
#[error("Invalid state {0}. Expected Active or Broken")]
|
||||
InvalidState(TenantState),
|
||||
|
||||
#[error("Tenant deletion is already in progress")]
|
||||
AlreadyInProgress,
|
||||
|
||||
#[error("Tenant map slot error {0}")]
|
||||
SlotError(#[from] TenantSlotError),
|
||||
|
||||
@@ -55,8 +51,6 @@ pub(crate) enum DeleteTenantError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
type DeletionGuard = tokio::sync::OwnedMutexGuard<DeleteTenantFlow>;
|
||||
|
||||
fn remote_tenant_delete_mark_path(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
@@ -287,14 +281,14 @@ impl DeleteTenantFlow {
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let mut guard = Self::prepare(&tenant).await?;
|
||||
Self::prepare(&tenant).await?;
|
||||
|
||||
if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await {
|
||||
if let Err(e) = Self::run_inner(conf, remote_storage.as_ref(), &tenant).await {
|
||||
tenant.set_broken(format!("{e:#}")).await;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Self::schedule_background(guard, conf, remote_storage, tenants, tenant);
|
||||
Self::schedule_background(conf, remote_storage, tenants, tenant);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -304,13 +298,10 @@ impl DeleteTenantFlow {
|
||||
// will result in an error, but here we need to be able to retry shutdown when tenant deletion is retried.
|
||||
// So the solution is to set tenant state to broken.
|
||||
async fn run_inner(
|
||||
guard: &mut OwnedMutexGuard<Self>,
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant: &Tenant,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-create-remote-mark", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-create-remote-mark"
|
||||
@@ -345,46 +336,25 @@ impl DeleteTenantFlow {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mark_in_progress(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::Finished => anyhow::bail!("Bug. Is in finished state"),
|
||||
Self::InProgress { .. } => { /* We're in a retry */ }
|
||||
Self::NotStarted => { /* Fresh start */ }
|
||||
}
|
||||
|
||||
*self = Self::InProgress;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn should_resume_deletion(
|
||||
conf: &'static PageServerConf,
|
||||
remote_mark_exists: bool,
|
||||
tenant: &Tenant,
|
||||
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
|
||||
let acquire = |t: &Tenant| {
|
||||
Some(
|
||||
Arc::clone(&t.delete_progress)
|
||||
.try_lock_owned()
|
||||
.expect("we're the only owner during init"),
|
||||
)
|
||||
};
|
||||
|
||||
) -> Result<bool, DeleteTenantError> {
|
||||
if remote_mark_exists {
|
||||
return Ok(acquire(tenant));
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let tenant_id = tenant.tenant_id;
|
||||
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
|
||||
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
|
||||
Ok(acquire(tenant))
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(None)
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn resume_from_attach(
|
||||
guard: DeletionGuard,
|
||||
tenant: &Arc<Tenant>,
|
||||
preload: Option<TenantPreload>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
@@ -403,19 +373,10 @@ impl DeleteTenantFlow {
|
||||
.await
|
||||
.context("attach")?;
|
||||
|
||||
Self::background(
|
||||
guard,
|
||||
tenant.conf,
|
||||
tenant.remote_storage.clone(),
|
||||
tenants,
|
||||
tenant,
|
||||
)
|
||||
.await
|
||||
Self::background(tenant.conf, tenant.remote_storage.clone(), tenants, tenant).await
|
||||
}
|
||||
|
||||
async fn prepare(
|
||||
tenant: &Arc<Tenant>,
|
||||
) -> Result<tokio::sync::OwnedMutexGuard<Self>, DeleteTenantError> {
|
||||
async fn prepare(tenant: &Arc<Tenant>) -> Result<(), DeleteTenantError> {
|
||||
// FIXME: unsure about active only. Our init jobs may not be cancellable properly,
|
||||
// so at least for now allow deletions only for active tenants. TODO recheck
|
||||
// Broken and Stopping is needed for retries.
|
||||
@@ -426,10 +387,6 @@ impl DeleteTenantFlow {
|
||||
return Err(DeleteTenantError::InvalidState(tenant.current_state()));
|
||||
}
|
||||
|
||||
let guard = Arc::clone(&tenant.delete_progress)
|
||||
.try_lock_owned()
|
||||
.map_err(|_| DeleteTenantError::AlreadyInProgress)?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-shutdown", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: tenant-delete-before-shutdown"))?
|
||||
});
|
||||
@@ -449,11 +406,10 @@ impl DeleteTenantFlow {
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(guard)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schedule_background(
|
||||
guard: OwnedMutexGuard<Self>,
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
@@ -469,9 +425,7 @@ impl DeleteTenantFlow {
|
||||
"tenant_delete",
|
||||
false,
|
||||
async move {
|
||||
if let Err(err) =
|
||||
Self::background(guard, conf, remote_storage, tenants, &tenant).await
|
||||
{
|
||||
if let Err(err) = Self::background(conf, remote_storage, tenants, &tenant).await {
|
||||
error!("Error: {err:#}");
|
||||
tenant.set_broken(format!("{err:#}")).await;
|
||||
};
|
||||
@@ -486,7 +440,6 @@ impl DeleteTenantFlow {
|
||||
}
|
||||
|
||||
async fn background(
|
||||
mut guard: OwnedMutexGuard<Self>,
|
||||
conf: &PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
@@ -550,8 +503,6 @@ impl DeleteTenantFlow {
|
||||
.set(locked.len() as u64);
|
||||
}
|
||||
|
||||
*guard = Self::Finished;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -792,6 +792,8 @@ pub(crate) async fn set_new_tenant_config(
|
||||
impl TenantManager {
|
||||
/// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
|
||||
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
|
||||
///
|
||||
/// This method is cancel-safe.
|
||||
pub(crate) fn get_attached_tenant_shard(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -1959,7 +1961,6 @@ pub(crate) async fn immediate_gc(
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
|
||||
// TODO: spawning is redundant now, need to hold the gate
|
||||
task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
TaskKind::GarbageCollector,
|
||||
@@ -1969,40 +1970,12 @@ pub(crate) async fn immediate_gc(
|
||||
false,
|
||||
async move {
|
||||
fail::fail_point!("immediate_gc_task_pre");
|
||||
|
||||
#[allow(unused_mut)]
|
||||
let mut result = tenant
|
||||
let result = tenant
|
||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
|
||||
.instrument(info_span!("manual_gc", %tenant_id, %timeline_id))
|
||||
.await;
|
||||
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
|
||||
// better once the types support it.
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
if let Ok(result) = result.as_mut() {
|
||||
// why not futures unordered? it seems it needs very much the same task structure
|
||||
// but would only run on single task.
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
for layer in std::mem::take(&mut result.doomed_layers) {
|
||||
js.spawn(layer.wait_drop());
|
||||
}
|
||||
tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
|
||||
while let Some(res) = js.join_next().await {
|
||||
res.expect("wait_drop should not panic");
|
||||
}
|
||||
}
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, false).ok();
|
||||
let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
|
||||
|
||||
if let Some(rtc) = rtc {
|
||||
// layer drops schedule actions on remote timeline client to actually do the
|
||||
// deletions; don't care just exit fast about the shutdown error
|
||||
drop(rtc.wait_completion().await);
|
||||
}
|
||||
}
|
||||
|
||||
match task_done.send(result) {
|
||||
Ok(_) => (),
|
||||
Err(result) => error!("failed to send gc result: {result:?}"),
|
||||
|
||||
@@ -188,7 +188,6 @@ use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use scopeguard::ScopeGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
pub(crate) use upload::upload_initdb_dir;
|
||||
@@ -403,11 +402,6 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_shard_index(&self) -> ShardIndex {
|
||||
// TODO: carry this on the struct
|
||||
ShardIndex::unsharded()
|
||||
}
|
||||
|
||||
pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
|
||||
match &mut *self.upload_queue.lock().unwrap() {
|
||||
UploadQueue::Uninitialized => None,
|
||||
@@ -471,7 +465,6 @@ impl RemoteTimelineClient {
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.get_shard_index(),
|
||||
self.generation,
|
||||
cancel,
|
||||
)
|
||||
@@ -664,10 +657,10 @@ impl RemoteTimelineClient {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
let with_metadata =
|
||||
let with_generations =
|
||||
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
|
||||
|
||||
self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
|
||||
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
|
||||
|
||||
// Launch the tasks immediately, if possible
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
@@ -702,7 +695,7 @@ impl RemoteTimelineClient {
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
names: I,
|
||||
) -> Vec<(LayerFileName, LayerFileMetadata)>
|
||||
) -> Vec<(LayerFileName, Generation)>
|
||||
where
|
||||
I: IntoIterator<Item = LayerFileName>,
|
||||
{
|
||||
@@ -710,17 +703,16 @@ impl RemoteTimelineClient {
|
||||
// so we don't need update it. Just serialize it.
|
||||
let metadata = upload_queue.latest_metadata.clone();
|
||||
|
||||
// Decorate our list of names with each name's metadata, dropping
|
||||
// names that are unexpectedly missing from our metadata. This metadata
|
||||
// is later used when physically deleting layers, to construct key paths.
|
||||
let with_metadata: Vec<_> = names
|
||||
// Decorate our list of names with each name's generation, dropping
|
||||
// names that are unexpectedly missing from our metadata.
|
||||
let with_generations: Vec<_> = names
|
||||
.into_iter()
|
||||
.filter_map(|name| {
|
||||
let meta = upload_queue.latest_files.remove(&name);
|
||||
|
||||
if let Some(meta) = meta {
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
Some((name, meta))
|
||||
Some((name, meta.generation))
|
||||
} else {
|
||||
// This can only happen if we forgot to to schedule the file upload
|
||||
// before scheduling the delete. Log it because it is a rare/strange
|
||||
@@ -733,10 +725,9 @@ impl RemoteTimelineClient {
|
||||
.collect();
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
for (name, metadata) in &with_metadata {
|
||||
let gen = metadata.generation;
|
||||
if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) {
|
||||
if unexpected == gen {
|
||||
for (name, gen) in &with_generations {
|
||||
if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), *gen) {
|
||||
if &unexpected == gen {
|
||||
tracing::error!("{name} was unlinked twice with same generation");
|
||||
} else {
|
||||
tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
|
||||
@@ -751,14 +742,14 @@ impl RemoteTimelineClient {
|
||||
self.schedule_index_upload(upload_queue, metadata);
|
||||
}
|
||||
|
||||
with_metadata
|
||||
with_generations
|
||||
}
|
||||
|
||||
/// Schedules deletion for layer files which have previously been unlinked from the
|
||||
/// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
|
||||
pub(crate) fn schedule_deletion_of_unlinked(
|
||||
self: &Arc<Self>,
|
||||
layers: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
layers: Vec<(LayerFileName, Generation)>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
@@ -771,22 +762,16 @@ impl RemoteTimelineClient {
|
||||
fn schedule_deletion_of_unlinked0(
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
with_metadata: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
with_generations: Vec<(LayerFileName, Generation)>,
|
||||
) {
|
||||
for (name, meta) in &with_metadata {
|
||||
info!(
|
||||
"scheduling deletion of layer {}{} (shard {})",
|
||||
name,
|
||||
meta.generation.get_suffix(),
|
||||
meta.shard
|
||||
);
|
||||
for (name, gen) in &with_generations {
|
||||
info!("scheduling deletion of layer {}{}", name, gen.get_suffix());
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
for (name, meta) in &with_metadata {
|
||||
let gen = meta.generation;
|
||||
for (name, gen) in &with_generations {
|
||||
match upload_queue.dangling_files.remove(name) {
|
||||
Some(same) if same == gen => { /* expected */ }
|
||||
Some(same) if &same == gen => { /* expected */ }
|
||||
Some(other) => {
|
||||
tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
|
||||
}
|
||||
@@ -798,7 +783,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// schedule the actual deletions
|
||||
let op = UploadOp::Delete(Delete {
|
||||
layers: with_metadata,
|
||||
layers: with_generations,
|
||||
});
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
@@ -827,8 +812,10 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Wait for all previously scheduled uploads/deletions to complete
|
||||
pub(crate) async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
///
|
||||
pub async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
let mut receiver = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
@@ -838,7 +825,6 @@ impl RemoteTimelineClient {
|
||||
if receiver.changed().await.is_err() {
|
||||
anyhow::bail!("wait_completion aborted because upload queue was stopped");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -865,56 +851,6 @@ impl RemoteTimelineClient {
|
||||
receiver
|
||||
}
|
||||
|
||||
/// Wait for all previously scheduled operations to complete, and then stop.
|
||||
///
|
||||
/// Not cancellation safe
|
||||
pub(crate) async fn shutdown(self: &Arc<Self>) -> Result<(), StopError> {
|
||||
// On cancellation the queue is left in ackward state of refusing new operations but
|
||||
// proper stop is yet to be called. On cancel the original or some later task must call
|
||||
// `stop` or `shutdown`.
|
||||
let sg = scopeguard::guard((), |_| {
|
||||
tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
|
||||
});
|
||||
|
||||
let fut = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = match &mut *guard {
|
||||
UploadQueue::Stopped(_) => return Ok(()),
|
||||
UploadQueue::Uninitialized => return Err(StopError::QueueUninitialized),
|
||||
UploadQueue::Initialized(ref mut init) => init,
|
||||
};
|
||||
|
||||
// if the queue is already stuck due to a shutdown operation which was cancelled, then
|
||||
// just don't add more of these as they would never complete.
|
||||
//
|
||||
// TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
|
||||
// in every place we would not have to jump through this hoop, and this method could be
|
||||
// made cancellable.
|
||||
if !upload_queue.shutting_down {
|
||||
upload_queue.shutting_down = true;
|
||||
upload_queue.queued_operations.push_back(UploadOp::Shutdown);
|
||||
// this operation is not counted similar to Barrier
|
||||
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
}
|
||||
|
||||
upload_queue.shutdown_ready.clone().acquire_owned()
|
||||
};
|
||||
|
||||
let res = fut.await;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(sg);
|
||||
|
||||
match res {
|
||||
Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
|
||||
Err(_closed) => {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
self.stop()
|
||||
}
|
||||
|
||||
/// Set the deleted_at field in the remote index file.
|
||||
///
|
||||
/// This fails if the upload queue has not been `stop()`ed.
|
||||
@@ -968,7 +904,6 @@ impl RemoteTimelineClient {
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.get_shard_index(),
|
||||
self.generation,
|
||||
&index_part_with_deleted_at,
|
||||
)
|
||||
@@ -1027,7 +962,6 @@ impl RemoteTimelineClient {
|
||||
remote_layer_path(
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
meta.shard,
|
||||
&file_name,
|
||||
meta.generation,
|
||||
)
|
||||
@@ -1076,12 +1010,7 @@ impl RemoteTimelineClient {
|
||||
.unwrap_or(
|
||||
// No generation-suffixed indices, assume we are dealing with
|
||||
// a legacy index.
|
||||
remote_index_path(
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.get_shard_index(),
|
||||
Generation::none(),
|
||||
),
|
||||
remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()),
|
||||
);
|
||||
|
||||
let remaining_layers: Vec<RemotePath> = remaining
|
||||
@@ -1152,9 +1081,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
|
||||
}
|
||||
|
||||
UploadOp::Barrier(_) | UploadOp::Shutdown => {
|
||||
upload_queue.inprogress_tasks.is_empty()
|
||||
}
|
||||
UploadOp::Barrier(_) => upload_queue.inprogress_tasks.is_empty(),
|
||||
};
|
||||
|
||||
// If we cannot launch this task, don't look any further.
|
||||
@@ -1167,13 +1094,6 @@ impl RemoteTimelineClient {
|
||||
break;
|
||||
}
|
||||
|
||||
if let UploadOp::Shutdown = next_op {
|
||||
// leave the op in the queue but do not start more tasks; it will be dropped when
|
||||
// the stop is called.
|
||||
upload_queue.shutdown_ready.close();
|
||||
break;
|
||||
}
|
||||
|
||||
// We can launch this task. Remove it from the queue first.
|
||||
let next_op = upload_queue.queued_operations.pop_front().unwrap();
|
||||
|
||||
@@ -1194,7 +1114,6 @@ impl RemoteTimelineClient {
|
||||
sender.send_replace(());
|
||||
continue;
|
||||
}
|
||||
UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
|
||||
};
|
||||
|
||||
// Assign unique ID to this task
|
||||
@@ -1300,7 +1219,6 @@ impl RemoteTimelineClient {
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.get_shard_index(),
|
||||
self.generation,
|
||||
index_part,
|
||||
)
|
||||
@@ -1333,10 +1251,10 @@ impl RemoteTimelineClient {
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
}
|
||||
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
|
||||
UploadOp::Barrier(_) => {
|
||||
// unreachable. Barrier operations are handled synchronously in
|
||||
// launch_queued_tasks
|
||||
warn!("unexpected {unexpected:?} operation in perform_upload_task");
|
||||
warn!("unexpected Barrier operation in perform_upload_task");
|
||||
break;
|
||||
}
|
||||
};
|
||||
@@ -1430,7 +1348,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.num_inprogress_deletions -= 1;
|
||||
None
|
||||
}
|
||||
UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
|
||||
UploadOp::Barrier(_) => unreachable!(),
|
||||
};
|
||||
|
||||
// Launch any queued tasks that were unblocked by this one.
|
||||
@@ -1485,7 +1403,7 @@ impl RemoteTimelineClient {
|
||||
reason: "should we track deletes? positive or negative sign?",
|
||||
},
|
||||
),
|
||||
UploadOp::Barrier(..) | UploadOp::Shutdown => {
|
||||
UploadOp::Barrier(_) => {
|
||||
// we do not account these
|
||||
return None;
|
||||
}
|
||||
@@ -1511,13 +1429,10 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
/// Close the upload queue for new operations and cancel queued operations.
|
||||
///
|
||||
/// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
|
||||
///
|
||||
/// In-progress operations will still be running after this function returns.
|
||||
/// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
|
||||
/// to wait for them to complete, after calling this function.
|
||||
pub(crate) fn stop(&self) -> Result<(), StopError> {
|
||||
pub fn stop(&self) -> Result<(), StopError> {
|
||||
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
|
||||
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
|
||||
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
|
||||
@@ -1555,8 +1470,6 @@ impl RemoteTimelineClient {
|
||||
queued_operations: VecDeque::default(),
|
||||
#[cfg(feature = "testing")]
|
||||
dangling_files: HashMap::default(),
|
||||
shutting_down: false,
|
||||
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
||||
};
|
||||
|
||||
let upload_queue = std::mem::replace(
|
||||
@@ -1614,14 +1527,12 @@ pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> R
|
||||
pub fn remote_layer_path(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
layer_file_name: &LayerFileName,
|
||||
generation: Generation,
|
||||
) -> RemotePath {
|
||||
// Generation-aware key format
|
||||
let path = format!(
|
||||
"tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
|
||||
shard.get_suffix(),
|
||||
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
|
||||
layer_file_name.file_name(),
|
||||
generation.get_suffix()
|
||||
);
|
||||
@@ -1639,12 +1550,10 @@ pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId
|
||||
pub fn remote_index_path(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
generation: Generation,
|
||||
) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
|
||||
shard.get_suffix(),
|
||||
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
|
||||
IndexPart::FILE_NAME,
|
||||
generation.get_suffix()
|
||||
))
|
||||
@@ -1869,7 +1778,6 @@ mod tests {
|
||||
println!("remote_timeline_dir: {remote_timeline_dir}");
|
||||
|
||||
let generation = harness.generation;
|
||||
let shard = harness.shard;
|
||||
|
||||
// Create a couple of dummy files, schedule upload for them
|
||||
|
||||
@@ -1886,7 +1794,7 @@ mod tests {
|
||||
harness.conf,
|
||||
&timeline,
|
||||
name,
|
||||
LayerFileMetadata::new(contents.len() as u64, generation, shard),
|
||||
LayerFileMetadata::new(contents.len() as u64, generation),
|
||||
)
|
||||
}).collect::<Vec<_>>();
|
||||
|
||||
@@ -2035,7 +1943,7 @@ mod tests {
|
||||
harness.conf,
|
||||
&timeline,
|
||||
layer_file_name_1.clone(),
|
||||
LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
|
||||
LayerFileMetadata::new(content_1.len() as u64, harness.generation),
|
||||
);
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
@@ -2100,11 +2008,7 @@ mod tests {
|
||||
assert_eq!(actual_c, expected_c);
|
||||
}
|
||||
|
||||
async fn inject_index_part(
|
||||
test_state: &TestSetup,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> IndexPart {
|
||||
async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
|
||||
// An empty IndexPart, just sufficient to ensure deserialization will succeed
|
||||
let example_metadata = TimelineMetadata::example();
|
||||
let example_index_part = IndexPart::new(
|
||||
@@ -2125,13 +2029,7 @@ mod tests {
|
||||
std::fs::create_dir_all(remote_timeline_dir).expect("creating test dir should work");
|
||||
|
||||
let index_path = test_state.harness.remote_fs_dir.join(
|
||||
remote_index_path(
|
||||
&test_state.harness.tenant_id,
|
||||
&TIMELINE_ID,
|
||||
shard,
|
||||
generation,
|
||||
)
|
||||
.get_path(),
|
||||
remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(),
|
||||
);
|
||||
eprintln!("Writing {index_path}");
|
||||
std::fs::write(&index_path, index_part_bytes).unwrap();
|
||||
@@ -2168,12 +2066,7 @@ mod tests {
|
||||
|
||||
// Simple case: we are in generation N, load the index from generation N - 1
|
||||
let generation_n = 5;
|
||||
let injected = inject_index_part(
|
||||
&test_state,
|
||||
Generation::new(generation_n - 1),
|
||||
ShardIndex::unsharded(),
|
||||
)
|
||||
.await;
|
||||
let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
|
||||
|
||||
assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
|
||||
|
||||
@@ -2191,34 +2084,22 @@ mod tests {
|
||||
|
||||
// A generation-less IndexPart exists in the bucket, we should find it
|
||||
let generation_n = 5;
|
||||
let injected_none =
|
||||
inject_index_part(&test_state, Generation::none(), ShardIndex::unsharded()).await;
|
||||
let injected_none = inject_index_part(&test_state, Generation::none()).await;
|
||||
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
|
||||
|
||||
// If a more recent-than-none generation exists, we should prefer to load that
|
||||
let injected_1 =
|
||||
inject_index_part(&test_state, Generation::new(1), ShardIndex::unsharded()).await;
|
||||
let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
|
||||
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
|
||||
|
||||
// If a more-recent-than-me generation exists, we should ignore it.
|
||||
let _injected_10 =
|
||||
inject_index_part(&test_state, Generation::new(10), ShardIndex::unsharded()).await;
|
||||
let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
|
||||
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
|
||||
|
||||
// If a directly previous generation exists, _and_ an index exists in my own
|
||||
// generation, I should prefer my own generation.
|
||||
let _injected_prev = inject_index_part(
|
||||
&test_state,
|
||||
Generation::new(generation_n - 1),
|
||||
ShardIndex::unsharded(),
|
||||
)
|
||||
.await;
|
||||
let injected_current = inject_index_part(
|
||||
&test_state,
|
||||
Generation::new(generation_n),
|
||||
ShardIndex::unsharded(),
|
||||
)
|
||||
.await;
|
||||
let _injected_prev =
|
||||
inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
|
||||
let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
|
||||
assert_got_index_part(
|
||||
&test_state,
|
||||
Generation::new(generation_n),
|
||||
|
||||
@@ -9,7 +9,6 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use camino::Utf8Path;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -54,7 +53,6 @@ pub async fn download_layer_file<'a>(
|
||||
let remote_path = remote_layer_path(
|
||||
&tenant_id,
|
||||
&timeline_id,
|
||||
layer_metadata.shard,
|
||||
layer_file_name,
|
||||
layer_metadata.generation,
|
||||
);
|
||||
@@ -215,11 +213,10 @@ async fn do_download_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
index_generation: Generation,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, shard, index_generation);
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
|
||||
|
||||
let index_part_bytes = download_retry_forever(
|
||||
|| async {
|
||||
@@ -257,7 +254,6 @@ pub(super) async fn download_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
my_generation: Generation,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
@@ -265,15 +261,8 @@ pub(super) async fn download_index_part(
|
||||
|
||||
if my_generation.is_none() {
|
||||
// Operating without generations: just fetch the generation-less path
|
||||
return do_download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard,
|
||||
my_generation,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote
|
||||
@@ -284,7 +273,6 @@ pub(super) async fn download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard,
|
||||
my_generation,
|
||||
cancel.clone(),
|
||||
)
|
||||
@@ -312,7 +300,6 @@ pub(super) async fn download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard,
|
||||
my_generation.previous(),
|
||||
cancel.clone(),
|
||||
)
|
||||
@@ -333,9 +320,8 @@ pub(super) async fn download_index_part(
|
||||
}
|
||||
|
||||
// General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
|
||||
// objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
|
||||
// to constructing a full index path with no generation, because the generation is a suffix.
|
||||
let index_prefix = remote_index_path(tenant_id, timeline_id, shard, Generation::none());
|
||||
// objects, and select the highest one with a generation <= my_generation.
|
||||
let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none());
|
||||
let indices = backoff::retry(
|
||||
|| async { storage.list_files(Some(&index_prefix)).await },
|
||||
|_| false,
|
||||
@@ -361,21 +347,14 @@ pub(super) async fn download_index_part(
|
||||
match max_previous_generation {
|
||||
Some(g) => {
|
||||
tracing::debug!("Found index_part in generation {g:?}");
|
||||
do_download_index_part(storage, tenant_id, timeline_id, shard, g, cancel).await
|
||||
do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await
|
||||
}
|
||||
None => {
|
||||
// Migration from legacy pre-generation state: we have a generation but no prior
|
||||
// attached pageservers did. Try to load from a no-generation path.
|
||||
tracing::info!("No index_part.json* found");
|
||||
do_download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard,
|
||||
Generation::none(),
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::tenant::upload_queue::UploadQueueInitialized;
|
||||
use crate::tenant::Generation;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -26,8 +25,6 @@ pub struct LayerFileMetadata {
|
||||
file_size: u64,
|
||||
|
||||
pub(crate) generation: Generation,
|
||||
|
||||
pub(crate) shard: ShardIndex,
|
||||
}
|
||||
|
||||
impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
|
||||
@@ -35,17 +32,15 @@ impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
|
||||
LayerFileMetadata {
|
||||
file_size: other.file_size,
|
||||
generation: other.generation,
|
||||
shard: other.shard,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
pub fn new(file_size: u64, generation: Generation, shard: ShardIndex) -> Self {
|
||||
pub fn new(file_size: u64, generation: Generation) -> Self {
|
||||
LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,10 +161,6 @@ pub struct IndexLayerMetadata {
|
||||
#[serde(default = "Generation::none")]
|
||||
#[serde(skip_serializing_if = "Generation::is_none")]
|
||||
pub generation: Generation,
|
||||
|
||||
#[serde(default = "ShardIndex::unsharded")]
|
||||
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
|
||||
pub shard: ShardIndex,
|
||||
}
|
||||
|
||||
impl From<LayerFileMetadata> for IndexLayerMetadata {
|
||||
@@ -177,7 +168,6 @@ impl From<LayerFileMetadata> for IndexLayerMetadata {
|
||||
IndexLayerMetadata {
|
||||
file_size: other.file_size,
|
||||
generation: other.generation,
|
||||
shard: other.shard,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -205,15 +195,13 @@ mod tests {
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
generation: Generation::none()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
generation: Generation::none()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -245,15 +233,13 @@ mod tests {
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
generation: Generation::none()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
generation: Generation::none()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -286,15 +272,13 @@ mod tests {
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
generation: Generation::none()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
generation: Generation::none()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -370,21 +354,19 @@ mod tests {
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
generation: Generation::none()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
generation: Generation::none()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()),
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
|
||||
};
|
||||
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
|
||||
@@ -4,7 +4,6 @@ use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use fail::fail_point;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::io::ErrorKind;
|
||||
use tokio::fs;
|
||||
|
||||
@@ -27,7 +26,6 @@ pub(super) async fn upload_index_part<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
generation: Generation,
|
||||
index_part: &'a IndexPart,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -44,7 +42,7 @@ pub(super) async fn upload_index_part<'a>(
|
||||
let index_part_size = index_part_bytes.len();
|
||||
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
|
||||
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, shard, generation);
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, generation);
|
||||
storage
|
||||
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path)
|
||||
.await
|
||||
|
||||
@@ -3,7 +3,6 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -97,7 +96,6 @@ impl Layer {
|
||||
desc,
|
||||
None,
|
||||
metadata.generation,
|
||||
metadata.shard,
|
||||
)));
|
||||
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
|
||||
@@ -138,7 +136,6 @@ impl Layer {
|
||||
desc,
|
||||
Some(inner),
|
||||
metadata.generation,
|
||||
metadata.shard,
|
||||
)
|
||||
}));
|
||||
|
||||
@@ -182,7 +179,6 @@ impl Layer {
|
||||
desc,
|
||||
Some(inner),
|
||||
timeline.generation,
|
||||
timeline.get_shard_index(),
|
||||
)
|
||||
}));
|
||||
|
||||
@@ -326,24 +322,6 @@ impl Layer {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Waits until this layer has been dropped (and if needed, local garbage collection and remote
|
||||
/// deletion scheduling has completed).
|
||||
///
|
||||
/// Does not start garbage collection, use [`Self::garbage_collect_on_drop`] for that
|
||||
/// separatedly.
|
||||
#[cfg(feature = "testing")]
|
||||
pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
|
||||
let mut rx = self.0.status.subscribe();
|
||||
|
||||
async move {
|
||||
loop {
|
||||
if let Err(tokio::sync::broadcast::error::RecvError::Closed) = rx.recv().await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
|
||||
@@ -448,15 +426,6 @@ struct LayerInner {
|
||||
/// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
|
||||
/// for created layers from [`Timeline::generation`].
|
||||
generation: Generation,
|
||||
|
||||
/// The shard of this Layer.
|
||||
///
|
||||
/// For layers created in this process, this will always be the [`ShardIndex`] of the
|
||||
/// current `ShardIdentity`` (TODO: add link once it's introduced).
|
||||
///
|
||||
/// For loaded layers, this may be some other value if the tenant has undergone
|
||||
/// a shard split since the layer was originally written.
|
||||
shard: ShardIndex,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for LayerInner {
|
||||
@@ -490,17 +459,13 @@ impl Drop for LayerInner {
|
||||
|
||||
let path = std::mem::take(&mut self.path);
|
||||
let file_name = self.layer_desc().filename();
|
||||
let gen = self.generation;
|
||||
let file_size = self.layer_desc().file_size;
|
||||
let timeline = self.timeline.clone();
|
||||
let meta = self.metadata();
|
||||
let status = self.status.clone();
|
||||
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
|
||||
// carry this until we are finished for [`Layer::wait_drop`] support
|
||||
let _status = status;
|
||||
|
||||
let removed = match std::fs::remove_file(path) {
|
||||
Ok(()) => true,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
@@ -524,7 +489,7 @@ impl Drop for LayerInner {
|
||||
timeline.metrics.resident_physical_size_sub(file_size);
|
||||
}
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
|
||||
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, gen)]);
|
||||
|
||||
if let Err(e) = res {
|
||||
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
|
||||
@@ -558,7 +523,6 @@ impl LayerInner {
|
||||
desc: PersistentLayerDesc,
|
||||
downloaded: Option<Arc<DownloadedLayer>>,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> Self {
|
||||
let path = conf
|
||||
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
|
||||
@@ -586,7 +550,6 @@ impl LayerInner {
|
||||
status: tokio::sync::broadcast::channel(1).0,
|
||||
consecutive_failures: AtomicUsize::new(0),
|
||||
generation,
|
||||
shard,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1114,7 +1077,7 @@ impl LayerInner {
|
||||
}
|
||||
|
||||
fn metadata(&self) -> LayerFileMetadata {
|
||||
LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
|
||||
LayerFileMetadata::new(self.desc.file_size, self.generation)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1438,7 +1401,6 @@ impl Default for LayerImplMetrics {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// reminder: this will be pageserver_layer_gcs_count_total with "_total" suffix
|
||||
let gcs = metrics::register_int_counter_vec!(
|
||||
"pageserver_layer_gcs_count",
|
||||
"Garbage collections started and completed in the Layer implementation",
|
||||
|
||||
@@ -62,7 +62,6 @@ use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
||||
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
|
||||
use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::to_pg_timestamp;
|
||||
@@ -251,6 +250,14 @@ pub struct Timeline {
|
||||
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
|
||||
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
|
||||
|
||||
/// Layer removal lock.
|
||||
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
|
||||
/// This lock is acquired in [`Timeline::gc`] and [`Timeline::compact`].
|
||||
/// This is an `Arc<Mutex>` lock because we need an owned
|
||||
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
|
||||
/// Note that [`DeleteTimelineFlow`] uses `delete_progress` field.
|
||||
pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
|
||||
|
||||
// Needed to ensure that we can't create a branch at a point that was already garbage collected
|
||||
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
|
||||
|
||||
@@ -311,24 +318,6 @@ pub struct Timeline {
|
||||
/// Cancellation token scoped to this timeline: anything doing long-running work relating
|
||||
/// to the timeline should drop out when this token fires.
|
||||
pub(crate) cancel: CancellationToken,
|
||||
|
||||
/// Make sure we only have one running compaction at a time in tests.
|
||||
///
|
||||
/// Must only be taken in two places:
|
||||
/// - [`Timeline::compact`] (this file)
|
||||
/// - [`delete::delete_local_layer_files`]
|
||||
///
|
||||
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
|
||||
compaction_lock: tokio::sync::Mutex<()>,
|
||||
|
||||
/// Make sure we only have one running gc at a time.
|
||||
///
|
||||
/// Must only be taken in two places:
|
||||
/// - [`Timeline::gc`] (this file)
|
||||
/// - [`delete::delete_local_layer_files`]
|
||||
///
|
||||
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
|
||||
gc_lock: tokio::sync::Mutex<()>,
|
||||
}
|
||||
|
||||
pub struct WalReceiverInfo {
|
||||
@@ -714,8 +703,6 @@ impl Timeline {
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
let _g = self.compaction_lock.lock().await;
|
||||
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let _permit = match super::tasks::concurrent_background_tasks_rate_limit(
|
||||
@@ -770,7 +757,7 @@ impl Timeline {
|
||||
// Below are functions compact_level0() and create_image_layers()
|
||||
// but they are a bit ad hoc and don't quite work like it's explained
|
||||
// above. Rewrite it.
|
||||
|
||||
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
|
||||
// Is the timeline being deleted?
|
||||
if self.is_stopping() {
|
||||
trace!("Dropping out of compaction on timeline shutdown");
|
||||
@@ -811,7 +798,8 @@ impl Timeline {
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size, ctx).await?;
|
||||
self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
|
||||
.await?;
|
||||
timer.stop_and_record();
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
@@ -957,7 +945,7 @@ impl Timeline {
|
||||
// what is problematic is the shutting down of RemoteTimelineClient, because
|
||||
// obviously it does not make sense to stop while we wait for it, but what
|
||||
// about corner cases like s3 suddenly hanging up?
|
||||
if let Err(e) = client.shutdown().await {
|
||||
if let Err(e) = client.wait_completion().await {
|
||||
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
|
||||
// we have some extra WAL replay to do next time the timeline starts.
|
||||
warn!("failed to flush to remote storage: {e:#}");
|
||||
@@ -1212,6 +1200,16 @@ impl Timeline {
|
||||
remote_client: &Arc<RemoteTimelineClient>,
|
||||
layers_to_evict: &[Layer],
|
||||
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
|
||||
// ensure that the layers have finished uploading
|
||||
// (don't hold the layer_removal_cs while we do it, we're not removing anything yet)
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
|
||||
// now lock out layer removal (compaction, gc, timeline deletion)
|
||||
let _layer_removal_guard = self.layer_removal_cs.lock().await;
|
||||
|
||||
{
|
||||
// to avoid racing with detach and delete_timeline
|
||||
let state = self.current_state();
|
||||
@@ -1422,6 +1420,7 @@ impl Timeline {
|
||||
layer_flush_done_tx,
|
||||
|
||||
write_lock: tokio::sync::Mutex::new(()),
|
||||
layer_removal_cs: Default::default(),
|
||||
|
||||
gc_info: std::sync::RwLock::new(GcInfo {
|
||||
retain_lsns: Vec::new(),
|
||||
@@ -1460,9 +1459,6 @@ impl Timeline {
|
||||
initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
|
||||
cancel,
|
||||
gate: Gate::new(format!("Timeline<{tenant_id}/{timeline_id}>")),
|
||||
|
||||
compaction_lock: tokio::sync::Mutex::default(),
|
||||
gc_lock: tokio::sync::Mutex::default(),
|
||||
};
|
||||
result.repartition_threshold =
|
||||
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
|
||||
@@ -1601,7 +1597,6 @@ impl Timeline {
|
||||
|
||||
// Copy to move into the task we're about to spawn
|
||||
let generation = self.generation;
|
||||
let shard = self.get_shard_index();
|
||||
let this = self.myself.upgrade().expect("&self method holds the arc");
|
||||
|
||||
let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
|
||||
@@ -1650,7 +1645,6 @@ impl Timeline {
|
||||
index_part.as_ref(),
|
||||
disk_consistent_lsn,
|
||||
generation,
|
||||
shard,
|
||||
);
|
||||
|
||||
let mut loaded_layers = Vec::new();
|
||||
@@ -3153,8 +3147,13 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
|
||||
|
||||
impl Timeline {
|
||||
/// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
|
||||
///
|
||||
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
|
||||
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
|
||||
/// start of level0 files compaction, the on-demand download should be revisited as well.
|
||||
async fn compact_level0_phase1(
|
||||
self: &Arc<Self>,
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
|
||||
mut stats: CompactLevel0Phase1StatsBuilder,
|
||||
target_file_size: u64,
|
||||
@@ -3241,6 +3240,8 @@ impl Timeline {
|
||||
let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
|
||||
let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
|
||||
|
||||
// FIXME: downloading while holding layer_removal_cs is not great, but we will remove that
|
||||
// soon
|
||||
deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
|
||||
for l in level0_deltas_iter {
|
||||
let lsn_range = &l.layer_desc().lsn_range;
|
||||
@@ -3590,6 +3591,7 @@ impl Timeline {
|
||||
///
|
||||
async fn compact_level0(
|
||||
self: &Arc<Self>,
|
||||
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
@@ -3611,9 +3613,16 @@ impl Timeline {
|
||||
let now = tokio::time::Instant::now();
|
||||
stats.read_lock_acquisition_micros =
|
||||
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
|
||||
self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
|
||||
.instrument(phase1_span)
|
||||
.await?
|
||||
let layer_removal_cs = layer_removal_cs.clone();
|
||||
self.compact_level0_phase1(
|
||||
layer_removal_cs,
|
||||
phase1_layers_locked,
|
||||
stats,
|
||||
target_file_size,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(phase1_span)
|
||||
.await?
|
||||
};
|
||||
|
||||
if new_layers.is_empty() && deltas_to_compact.is_empty() {
|
||||
@@ -3621,6 +3630,17 @@ impl Timeline {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Before deleting any layers, we need to wait for their upload ops to finish.
|
||||
// See remote_timeline_client module level comment on consistency.
|
||||
// Do it here because we don't want to hold self.layers.write() while waiting.
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
debug!("waiting for upload ops to complete");
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
}
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
let mut duplicated_layers = HashSet::new();
|
||||
@@ -3652,7 +3672,12 @@ impl Timeline {
|
||||
};
|
||||
|
||||
// deletion will happen later, the layer file manager calls garbage_collect_on_drop
|
||||
guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
|
||||
guard.finish_compact_l0(
|
||||
&layer_removal_cs,
|
||||
&remove_layers,
|
||||
&insert_layers,
|
||||
&self.metrics,
|
||||
);
|
||||
|
||||
if let Some(remote_client) = self.remote_client.as_ref() {
|
||||
remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
|
||||
@@ -3763,17 +3788,19 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Garbage collect layer files on a timeline that are no longer needed.
|
||||
///
|
||||
/// Currently, we don't make any attempt at removing unneeded page versions
|
||||
/// within a layer file. We can only remove the whole file if it's fully
|
||||
/// obsolete.
|
||||
///
|
||||
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
|
||||
let _g = self.gc_lock.lock().await;
|
||||
let timer = self.metrics.garbage_collect_histo.start_timer();
|
||||
|
||||
fail_point!("before-timeline-gc");
|
||||
|
||||
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
|
||||
// Is the timeline being deleted?
|
||||
if self.is_stopping() {
|
||||
anyhow::bail!("timeline is Stopping");
|
||||
@@ -3791,7 +3818,13 @@ impl Timeline {
|
||||
let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
|
||||
|
||||
let res = self
|
||||
.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
|
||||
.gc_timeline(
|
||||
layer_removal_cs.clone(),
|
||||
horizon_cutoff,
|
||||
pitr_cutoff,
|
||||
retain_lsns,
|
||||
new_gc_cutoff,
|
||||
)
|
||||
.instrument(
|
||||
info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
|
||||
)
|
||||
@@ -3805,6 +3838,7 @@ impl Timeline {
|
||||
|
||||
async fn gc_timeline(
|
||||
&self,
|
||||
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
horizon_cutoff: Lsn,
|
||||
pitr_cutoff: Lsn,
|
||||
retain_lsns: Vec<Lsn>,
|
||||
@@ -3842,6 +3876,17 @@ impl Timeline {
|
||||
|
||||
debug!("retain_lsns: {:?}", retain_lsns);
|
||||
|
||||
// Before deleting any layers, we need to wait for their upload ops to finish.
|
||||
// See storage_sync module level comment on consistency.
|
||||
// Do it here because we don't want to hold self.layers.write() while waiting.
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
debug!("waiting for upload ops to complete");
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
}
|
||||
|
||||
let mut layers_to_remove = Vec::new();
|
||||
let mut wanted_image_layers = KeySpaceRandomAccum::default();
|
||||
|
||||
@@ -3957,11 +4002,6 @@ impl Timeline {
|
||||
//
|
||||
// This does not in fact have any effect as we no longer consider local metadata unless
|
||||
// running without remote storage.
|
||||
//
|
||||
// This unconditionally schedules also an index_part.json update, even though, we will
|
||||
// be doing one a bit later with the unlinked gc'd layers.
|
||||
//
|
||||
// TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), None)
|
||||
.await?;
|
||||
|
||||
@@ -3976,16 +4016,11 @@ impl Timeline {
|
||||
remote_client.schedule_gc_update(&gc_layers)?;
|
||||
}
|
||||
|
||||
guard.finish_gc_timeline(&gc_layers);
|
||||
guard.finish_gc_timeline(&layer_removal_cs, gc_layers);
|
||||
|
||||
if result.layers_removed != 0 {
|
||||
fail_point!("after-timeline-gc-removed-layers");
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
result.doomed_layers = gc_layers;
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
@@ -3997,7 +4032,9 @@ impl Timeline {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
///
|
||||
/// Reconstruct a value, using the given base image and WAL records in 'data'.
|
||||
///
|
||||
async fn reconstruct_value(
|
||||
&self,
|
||||
key: Key,
|
||||
@@ -4327,11 +4364,6 @@ impl Timeline {
|
||||
resident_layers,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_shard_index(&self) -> ShardIndex {
|
||||
// TODO: carry this on the struct
|
||||
ShardIndex::unsharded()
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalPathItem = (
|
||||
|
||||
@@ -110,11 +110,11 @@ async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTi
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Grab the compaction and gc locks, and actually perform the deletion.
|
||||
/// Grab the layer_removal_cs lock, and actually perform the deletion.
|
||||
///
|
||||
/// The locks prevent GC or compaction from running at the same time. The background tasks do not
|
||||
/// register themselves with the timeline it's operating on, so it might still be running even
|
||||
/// though we called `shutdown_tasks`.
|
||||
/// This lock prevents prevents GC or compaction from running at the same time.
|
||||
/// The GC task doesn't register itself with the timeline it's operating on,
|
||||
/// so it might still be running even though we called `shutdown_tasks`.
|
||||
///
|
||||
/// Note that there are still other race conditions between
|
||||
/// GC, compaction and timeline deletion. See
|
||||
@@ -122,19 +122,14 @@ async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTi
|
||||
///
|
||||
/// No timeout here, GC & Compaction should be responsive to the
|
||||
/// `TimelineState::Stopping` change.
|
||||
// pub(super): documentation link
|
||||
pub(super) async fn delete_local_layer_files(
|
||||
async fn delete_local_layer_files(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
timeline: &Timeline,
|
||||
) -> anyhow::Result<()> {
|
||||
let guards = async { tokio::join!(timeline.gc_lock.lock(), timeline.compaction_lock.lock()) };
|
||||
let guards = crate::timed(
|
||||
guards,
|
||||
"acquire gc and compaction locks",
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
info!("waiting for layer_removal_cs.lock()");
|
||||
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
|
||||
info!("got layer_removal_cs.lock(), deleting layer files");
|
||||
|
||||
// NB: storage_sync upload tasks that reference these layers have been cancelled
|
||||
// by the caller.
|
||||
@@ -155,8 +150,8 @@ pub(super) async fn delete_local_layer_files(
|
||||
// because of a previous failure/cancellation at/after
|
||||
// failpoint timeline-delete-after-rm.
|
||||
//
|
||||
// ErrorKind::NotFound can also happen if we race with tenant detach, because,
|
||||
// no locks are shared.
|
||||
// It can also happen if we race with tenant detach, because,
|
||||
// it doesn't grab the layer_removal_cs lock.
|
||||
//
|
||||
// For now, log and continue.
|
||||
// warn! level is technically not appropriate for the
|
||||
@@ -224,8 +219,8 @@ pub(super) async fn delete_local_layer_files(
|
||||
.with_context(|| format!("Failed to remove: {}", entry.path().display()))?;
|
||||
}
|
||||
|
||||
info!("finished deleting layer files, releasing locks");
|
||||
drop(guards);
|
||||
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
|
||||
drop(layer_removal_guard);
|
||||
|
||||
fail::fail_point!("timeline-delete-after-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
|
||||
|
||||
@@ -296,6 +296,7 @@ impl Timeline {
|
||||
stats.evicted += 1;
|
||||
}
|
||||
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
|
||||
// compaction/gc removed the file while we were waiting on layer_removal_cs
|
||||
stats.not_evictable += 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ use crate::{
|
||||
};
|
||||
use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -108,7 +107,6 @@ pub(super) fn reconcile(
|
||||
index_part: Option<&IndexPart>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> Vec<(LayerFileName, Result<Decision, DismissedLayer>)> {
|
||||
use Decision::*;
|
||||
|
||||
@@ -120,13 +118,10 @@ pub(super) fn reconcile(
|
||||
.map(|(name, file_size)| {
|
||||
(
|
||||
name,
|
||||
// The generation and shard here will be corrected to match IndexPart in the merge below, unless
|
||||
// The generation here will be corrected to match IndexPart in the merge below, unless
|
||||
// it is not in IndexPart, in which case using our current generation makes sense
|
||||
// because it will be uploaded in this generation.
|
||||
(
|
||||
Some(LayerFileMetadata::new(file_size, generation, shard)),
|
||||
None,
|
||||
),
|
||||
(Some(LayerFileMetadata::new(file_size, generation)), None),
|
||||
)
|
||||
})
|
||||
.collect::<Collected>();
|
||||
|
||||
@@ -190,6 +190,7 @@ impl LayerManager {
|
||||
/// Called when compaction is completed.
|
||||
pub(crate) fn finish_compact_l0(
|
||||
&mut self,
|
||||
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
compact_from: &[Layer],
|
||||
compact_to: &[ResidentLayer],
|
||||
metrics: &TimelineMetrics,
|
||||
@@ -200,16 +201,25 @@ impl LayerManager {
|
||||
metrics.record_new_file_metrics(l.layer_desc().file_size);
|
||||
}
|
||||
for l in compact_from {
|
||||
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
|
||||
Self::delete_historic_layer(layer_removal_cs, l, &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
updates.flush();
|
||||
}
|
||||
|
||||
/// Called when garbage collect has selected the layers to be removed.
|
||||
pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
|
||||
/// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
|
||||
pub(crate) fn finish_gc_timeline(
|
||||
&mut self,
|
||||
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
gc_layers: Vec<Layer>,
|
||||
) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for doomed_layer in gc_layers {
|
||||
Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
|
||||
Self::delete_historic_layer(
|
||||
layer_removal_cs,
|
||||
&doomed_layer,
|
||||
&mut updates,
|
||||
&mut self.layer_fmgr,
|
||||
);
|
||||
}
|
||||
updates.flush()
|
||||
}
|
||||
@@ -228,6 +238,7 @@ impl LayerManager {
|
||||
/// Remote storage is not affected by this operation.
|
||||
fn delete_historic_layer(
|
||||
// we cannot remove layers otherwise, since gc and compaction will race
|
||||
_layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layer: &Layer,
|
||||
updates: &mut BatchedUpdates<'_>,
|
||||
mapping: &mut LayerFileManager<Layer>,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::storage_layer::LayerFileName;
|
||||
use super::storage_layer::ResidentLayer;
|
||||
use super::Generation;
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::remote_timeline_client::index::IndexPart;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
@@ -14,9 +15,6 @@ use utils::lsn::AtomicLsn;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
use utils::generation::Generation;
|
||||
|
||||
// clippy warns that Uninitialized is much smaller than Initialized, which wastes
|
||||
// memory for Uninitialized variants. Doesn't matter in practice, there are not
|
||||
// that many upload queues in a running pageserver, and most of them are initialized
|
||||
@@ -90,14 +88,6 @@ pub(crate) struct UploadQueueInitialized {
|
||||
/// bug causing leaks, then it's better to not leave this enabled for production builds.
|
||||
#[cfg(feature = "testing")]
|
||||
pub(crate) dangling_files: HashMap<LayerFileName, Generation>,
|
||||
|
||||
/// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
|
||||
pub(crate) shutting_down: bool,
|
||||
|
||||
/// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
|
||||
/// wait on until one of them stops the queue. The semaphore is closed when
|
||||
/// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
|
||||
pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
|
||||
}
|
||||
|
||||
impl UploadQueueInitialized {
|
||||
@@ -156,8 +146,6 @@ impl UploadQueue {
|
||||
queued_operations: VecDeque::new(),
|
||||
#[cfg(feature = "testing")]
|
||||
dangling_files: HashMap::new(),
|
||||
shutting_down: false,
|
||||
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
||||
};
|
||||
|
||||
*self = UploadQueue::Initialized(state);
|
||||
@@ -205,8 +193,6 @@ impl UploadQueue {
|
||||
queued_operations: VecDeque::new(),
|
||||
#[cfg(feature = "testing")]
|
||||
dangling_files: HashMap::new(),
|
||||
shutting_down: false,
|
||||
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
||||
};
|
||||
|
||||
*self = UploadQueue::Initialized(state);
|
||||
@@ -218,13 +204,7 @@ impl UploadQueue {
|
||||
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
|
||||
anyhow::bail!("queue is in state {}", self.as_str())
|
||||
}
|
||||
UploadQueue::Initialized(x) => {
|
||||
if !x.shutting_down {
|
||||
Ok(x)
|
||||
} else {
|
||||
anyhow::bail!("queue is shutting down")
|
||||
}
|
||||
}
|
||||
UploadQueue::Initialized(x) => Ok(x),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -252,7 +232,7 @@ pub(crate) struct UploadTask {
|
||||
/// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Delete {
|
||||
pub(crate) layers: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
pub(crate) layers: Vec<(LayerFileName, Generation)>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -268,10 +248,6 @@ pub(crate) enum UploadOp {
|
||||
|
||||
/// Barrier. When the barrier operation is reached,
|
||||
Barrier(tokio::sync::watch::Sender<()>),
|
||||
|
||||
/// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
|
||||
/// this is the same as a Barrier.
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for UploadOp {
|
||||
@@ -293,7 +269,6 @@ impl std::fmt::Display for UploadOp {
|
||||
write!(f, "Delete({} layers)", delete.layers.len())
|
||||
}
|
||||
UploadOp::Barrier(_) => write!(f, "Barrier"),
|
||||
UploadOp::Shutdown => write!(f, "Shutdown"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2079,88 +2079,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Replay a wal segment file taken directly from safekeepers.
|
||||
///
|
||||
/// This test is useful for benchmarking since it allows us to profile only
|
||||
/// the walingest code in a single-threaded executor, and iterate more quickly
|
||||
/// without waiting for unrelated steps.
|
||||
#[tokio::test]
|
||||
async fn test_ingest_real_wal() {
|
||||
use crate::tenant::harness::*;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
|
||||
// Define test data path and constants.
|
||||
//
|
||||
// Steps to reconstruct the data, if needed:
|
||||
// 1. Run the pgbench python test
|
||||
// 2. Take the first wal segment file from safekeeper
|
||||
// 3. Compress it using `zstd --long input_file`
|
||||
// 4. Copy initdb.tar.zst from local_fs_remote_storage
|
||||
// 5. Grep sk logs for "restart decoder" to get startpoint
|
||||
// 6. Run just the decoder from this test to get the endpoint.
|
||||
// It's the last LSN the decoder will output.
|
||||
let pg_version = 15; // The test data was generated by pg15
|
||||
let path = "test_data/sk_wal_segment_from_pgbench";
|
||||
let wal_segment_path = format!("{path}/000000010000000000000001.zst");
|
||||
let startpoint = Lsn::from_hex("14AEC08").unwrap();
|
||||
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
|
||||
|
||||
// Bootstrap a real timeline. We can't use create_test_timeline because
|
||||
// it doesn't create a real checkpoint, and Walingest::new tries to parse
|
||||
// the garbage data.
|
||||
//
|
||||
// TODO use the initdb.tar.zst file stored with the test data to avoid
|
||||
// problems with inconsistent initdb results after pg minor version bumps.
|
||||
let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal")
|
||||
.unwrap()
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant
|
||||
.bootstrap_timeline(TIMELINE_ID, pg_version, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// We fully read and decompress this into memory before decoding
|
||||
// to get a more accurate perf profile of the decoder.
|
||||
let bytes = {
|
||||
use async_compression::tokio::bufread::ZstdDecoder;
|
||||
let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
|
||||
let reader = tokio::io::BufReader::new(file);
|
||||
let decoder = ZstdDecoder::new(reader);
|
||||
let mut reader = tokio::io::BufReader::new(decoder);
|
||||
let mut buffer = Vec::new();
|
||||
tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
|
||||
buffer
|
||||
};
|
||||
|
||||
// TODO start a profiler too
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
// Initialize walingest
|
||||
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
|
||||
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
|
||||
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut modification = tline.begin_modification(endpoint);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
println!("decoding {} bytes", bytes.len() - xlogoff);
|
||||
|
||||
// Decode and ingest wal. We process the wal in chunks because
|
||||
// that's what happens when we get bytes from safekeepers.
|
||||
for chunk in bytes[xlogoff..].chunks(50) {
|
||||
decoder.feed_bytes(chunk);
|
||||
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
println!("done in {:?}", duration);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1182,7 +1182,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn short_v14_redo() {
|
||||
let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
|
||||
let expected = std::fs::read("fixtures/short_v14_redo.page").unwrap();
|
||||
|
||||
let h = RedoHarness::new().unwrap();
|
||||
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -21,7 +21,6 @@
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "c.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
|
||||
@@ -88,12 +87,6 @@ bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) =
|
||||
static bool pageserver_flush(void);
|
||||
static void pageserver_disconnect(void);
|
||||
|
||||
static bool
|
||||
PagestoreShmemIsValid()
|
||||
{
|
||||
return pagestore_shared && UsedShmemSegAddr;
|
||||
}
|
||||
|
||||
static bool
|
||||
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
|
||||
{
|
||||
@@ -103,7 +96,7 @@ CheckPageserverConnstring(char **newval, void **extra, GucSource source)
|
||||
static void
|
||||
AssignPageserverConnstring(const char *newval, void *extra)
|
||||
{
|
||||
if(!PagestoreShmemIsValid())
|
||||
if(!pagestore_shared)
|
||||
return;
|
||||
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
|
||||
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
|
||||
@@ -114,7 +107,7 @@ AssignPageserverConnstring(const char *newval, void *extra)
|
||||
static bool
|
||||
CheckConnstringUpdated()
|
||||
{
|
||||
if(!PagestoreShmemIsValid())
|
||||
if(!pagestore_shared)
|
||||
return false;
|
||||
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
|
||||
}
|
||||
@@ -122,7 +115,7 @@ CheckConnstringUpdated()
|
||||
static void
|
||||
ReloadConnstring()
|
||||
{
|
||||
if(!PagestoreShmemIsValid())
|
||||
if(!pagestore_shared)
|
||||
return;
|
||||
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
|
||||
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
|
||||
|
||||
@@ -2,4 +2,3 @@
|
||||
comment = 'cloud storage for PostgreSQL'
|
||||
default_version = '1.1'
|
||||
module_pathname = '$libdir/neon'
|
||||
relocatable = true
|
||||
|
||||
174
poetry.lock
generated
174
poetry.lock
generated
@@ -2,99 +2,111 @@
|
||||
|
||||
[[package]]
|
||||
name = "aiohttp"
|
||||
version = "3.9.0"
|
||||
version = "3.8.6"
|
||||
description = "Async http client/server framework (asyncio)"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:6896b8416be9ada4d22cd359d7cb98955576ce863eadad5596b7cdfbf3e17c6c"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:1736d87dad8ef46a8ec9cddd349fa9f7bd3a064c47dd6469c0d6763d3d49a4fc"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8c9e5f4d7208cda1a2bb600e29069eecf857e6980d0ccc922ccf9d1372c16f4b"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8488519aa05e636c5997719fe543c8daf19f538f4fa044f3ce94bee608817cff"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:5ab16c254e2312efeb799bc3c06897f65a133b38b69682bf75d1f1ee1a9c43a9"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7a94bde005a8f926d0fa38b88092a03dea4b4875a61fbcd9ac6f4351df1b57cd"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b777c9286b6c6a94f50ddb3a6e730deec327e9e2256cb08b5530db0f7d40fd8"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:571760ad7736b34d05597a1fd38cbc7d47f7b65deb722cb8e86fd827404d1f6b"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:deac0a32aec29608eb25d730f4bc5a261a65b6c48ded1ed861d2a1852577c932"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:4ee1b4152bc3190cc40ddd6a14715e3004944263ea208229ab4c297712aa3075"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:3607375053df58ed6f23903aa10cf3112b1240e8c799d243bbad0f7be0666986"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-musllinux_1_1_s390x.whl", hash = "sha256:65b0a70a25456d329a5e1426702dde67be0fb7a4ead718005ba2ca582d023a94"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:5a2eb5311a37fe105aa35f62f75a078537e1a9e4e1d78c86ec9893a3c97d7a30"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-win32.whl", hash = "sha256:2cbc14a13fb6b42d344e4f27746a4b03a2cb0c1c3c5b932b0d6ad8881aa390e3"},
|
||||
{file = "aiohttp-3.9.0-cp310-cp310-win_amd64.whl", hash = "sha256:ac9669990e2016d644ba8ae4758688534aabde8dbbc81f9af129c3f5f01ca9cd"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:f8e05f5163528962ce1d1806fce763ab893b1c5b7ace0a3538cd81a90622f844"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:4afa8f71dba3a5a2e1e1282a51cba7341ae76585345c43d8f0e624882b622218"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:f929f4c9b9a00f3e6cc0587abb95ab9c05681f8b14e0fe1daecfa83ea90f8318"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:28185e36a78d247c55e9fbea2332d16aefa14c5276a582ce7a896231c6b1c208"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a486ddf57ab98b6d19ad36458b9f09e6022de0381674fe00228ca7b741aacb2f"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:70e851f596c00f40a2f00a46126c95c2e04e146015af05a9da3e4867cfc55911"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c5b7bf8fe4d39886adc34311a233a2e01bc10eb4e842220235ed1de57541a896"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c67a51ea415192c2e53e4e048c78bab82d21955b4281d297f517707dc836bf3d"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:694df243f394629bcae2d8ed94c589a181e8ba8604159e6e45e7b22e58291113"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:3dd8119752dd30dd7bca7d4bc2a92a59be6a003e4e5c2cf7e248b89751b8f4b7"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:eb6dfd52063186ac97b4caa25764cdbcdb4b10d97f5c5f66b0fa95052e744eb7"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-musllinux_1_1_s390x.whl", hash = "sha256:d97c3e286d0ac9af6223bc132dc4bad6540b37c8d6c0a15fe1e70fb34f9ec411"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:816f4db40555026e4cdda604a1088577c1fb957d02f3f1292e0221353403f192"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-win32.whl", hash = "sha256:3abf0551874fecf95f93b58f25ef4fc9a250669a2257753f38f8f592db85ddea"},
|
||||
{file = "aiohttp-3.9.0-cp311-cp311-win_amd64.whl", hash = "sha256:e18d92c3e9e22553a73e33784fcb0ed484c9874e9a3e96c16a8d6a1e74a0217b"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:99ae01fb13a618b9942376df77a1f50c20a281390dad3c56a6ec2942e266220d"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:05857848da443c8c12110d99285d499b4e84d59918a21132e45c3f0804876994"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:317719d7f824eba55857fe0729363af58e27c066c731bc62cd97bc9c3d9c7ea4"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a1e3b3c107ccb0e537f309f719994a55621acd2c8fdf6d5ce5152aed788fb940"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:45820ddbb276113ead8d4907a7802adb77548087ff5465d5c554f9aa3928ae7d"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:05a183f1978802588711aed0dea31e697d760ce9055292db9dc1604daa9a8ded"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:51a4cd44788ea0b5e6bb8fa704597af3a30be75503a7ed1098bc5b8ffdf6c982"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:673343fbc0c1ac44d0d2640addc56e97a052504beacd7ade0dc5e76d3a4c16e8"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:7e8a3b79b6d186a9c99761fd4a5e8dd575a48d96021f220ac5b5fa856e5dd029"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:6777a390e41e78e7c45dab43a4a0196c55c3b8c30eebe017b152939372a83253"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:7ae5f99a32c53731c93ac3075abd3e1e5cfbe72fc3eaac4c27c9dd64ba3b19fe"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-musllinux_1_1_s390x.whl", hash = "sha256:f1e4f254e9c35d8965d377e065c4a8a55d396fe87c8e7e8429bcfdeeb229bfb3"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:11ca808f9a6b63485059f5f6e164ef7ec826483c1212a44f268b3653c91237d8"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-win32.whl", hash = "sha256:de3cc86f4ea8b4c34a6e43a7306c40c1275e52bfa9748d869c6b7d54aa6dad80"},
|
||||
{file = "aiohttp-3.9.0-cp312-cp312-win_amd64.whl", hash = "sha256:ca4fddf84ac7d8a7d0866664936f93318ff01ee33e32381a115b19fb5a4d1202"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:f09960b5bb1017d16c0f9e9f7fc42160a5a49fa1e87a175fd4a2b1a1833ea0af"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:8303531e2c17b1a494ffaeba48f2da655fe932c4e9a2626c8718403c83e5dd2b"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:4790e44f46a4aa07b64504089def5744d3b6780468c4ec3a1a36eb7f2cae9814"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a1d7edf74a36de0e5ca50787e83a77cf352f5504eb0ffa3f07000a911ba353fb"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:94697c7293199c2a2551e3e3e18438b4cba293e79c6bc2319f5fd652fccb7456"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:a1b66dbb8a7d5f50e9e2ea3804b01e766308331d0cac76eb30c563ac89c95985"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9623cfd9e85b76b83ef88519d98326d4731f8d71869867e47a0b979ffec61c73"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f32c86dc967ab8c719fd229ce71917caad13cc1e8356ee997bf02c5b368799bf"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:f50b4663c3e0262c3a361faf440761fbef60ccdde5fe8545689a4b3a3c149fb4"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:dcf71c55ec853826cd70eadb2b6ac62ec577416442ca1e0a97ad875a1b3a0305"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:42fe4fd9f0dfcc7be4248c162d8056f1d51a04c60e53366b0098d1267c4c9da8"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-musllinux_1_1_s390x.whl", hash = "sha256:76a86a9989ebf82ee61e06e2bab408aec4ea367dc6da35145c3352b60a112d11"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:f9e09a1c83521d770d170b3801eea19b89f41ccaa61d53026ed111cb6f088887"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-win32.whl", hash = "sha256:a00ce44c21612d185c5275c5cba4bab8d7c1590f248638b667ed8a782fa8cd6f"},
|
||||
{file = "aiohttp-3.9.0-cp38-cp38-win_amd64.whl", hash = "sha256:d5b9345ab92ebe6003ae11d8092ce822a0242146e6fa270889b9ba965457ca40"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:98d21092bf2637c5fa724a428a69e8f5955f2182bff61f8036827cf6ce1157bf"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:35a68cd63ca6aaef5707888f17a70c36efe62b099a4e853d33dc2e9872125be8"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3d7f6235c7475658acfc1769d968e07ab585c79f6ca438ddfecaa9a08006aee2"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:db04d1de548f7a62d1dd7e7cdf7c22893ee168e22701895067a28a8ed51b3735"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:536b01513d67d10baf6f71c72decdf492fb7433c5f2f133e9a9087379d4b6f31"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:87c8b0a6487e8109427ccf638580865b54e2e3db4a6e0e11c02639231b41fc0f"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7276fe0017664414fdc3618fca411630405f1aaf0cc3be69def650eb50441787"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:23170247ef89ffa842a02bbfdc425028574d9e010611659abeb24d890bc53bb8"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:b1a2ea8252cacc7fd51df5a56d7a2bb1986ed39be9397b51a08015727dfb69bd"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:2d71abc15ff7047412ef26bf812dfc8d0d1020d664617f4913df2df469f26b76"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:2d820162c8c2bdbe97d328cd4f417c955ca370027dce593345e437b2e9ffdc4d"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-musllinux_1_1_s390x.whl", hash = "sha256:2779f5e7c70f7b421915fd47db332c81de365678180a9f3ab404088f87ba5ff9"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:366bc870d7ac61726f32a489fbe3d1d8876e87506870be66b01aeb84389e967e"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-win32.whl", hash = "sha256:1df43596b826022b14998f0460926ce261544fedefe0d2f653e1b20f49e96454"},
|
||||
{file = "aiohttp-3.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:9c196b30f1b1aa3363a69dd69079ae9bec96c2965c4707eaa6914ba099fb7d4f"},
|
||||
{file = "aiohttp-3.9.0.tar.gz", hash = "sha256:09f23292d29135025e19e8ff4f0a68df078fe4ee013bca0105b2e803989de92d"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:41d55fc043954cddbbd82503d9cc3f4814a40bcef30b3569bc7b5e34130718c1"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:1d84166673694841d8953f0a8d0c90e1087739d24632fe86b1a08819168b4566"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:253bf92b744b3170eb4c4ca2fa58f9c4b87aeb1df42f71d4e78815e6e8b73c9e"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3fd194939b1f764d6bb05490987bfe104287bbf51b8d862261ccf66f48fb4096"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:6c5f938d199a6fdbdc10bbb9447496561c3a9a565b43be564648d81e1102ac22"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2817b2f66ca82ee699acd90e05c95e79bbf1dc986abb62b61ec8aaf851e81c93"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0fa375b3d34e71ccccf172cab401cd94a72de7a8cc01847a7b3386204093bb47"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9de50a199b7710fa2904be5a4a9b51af587ab24c8e540a7243ab737b45844543"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:e1d8cb0b56b3587c5c01de3bf2f600f186da7e7b5f7353d1bf26a8ddca57f965"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:8e31e9db1bee8b4f407b77fd2507337a0a80665ad7b6c749d08df595d88f1cf5"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:7bc88fc494b1f0311d67f29fee6fd636606f4697e8cc793a2d912ac5b19aa38d"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-musllinux_1_1_s390x.whl", hash = "sha256:ec00c3305788e04bf6d29d42e504560e159ccaf0be30c09203b468a6c1ccd3b2"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:ad1407db8f2f49329729564f71685557157bfa42b48f4b93e53721a16eb813ed"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-win32.whl", hash = "sha256:ccc360e87341ad47c777f5723f68adbb52b37ab450c8bc3ca9ca1f3e849e5fe2"},
|
||||
{file = "aiohttp-3.8.6-cp310-cp310-win_amd64.whl", hash = "sha256:93c15c8e48e5e7b89d5cb4613479d144fda8344e2d886cf694fd36db4cc86865"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:6e2f9cc8e5328f829f6e1fb74a0a3a939b14e67e80832975e01929e320386b34"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:e6a00ffcc173e765e200ceefb06399ba09c06db97f401f920513a10c803604ca"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:41bdc2ba359032e36c0e9de5a3bd00d6fb7ea558a6ce6b70acedf0da86458321"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:14cd52ccf40006c7a6cd34a0f8663734e5363fd981807173faf3a017e202fec9"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2d5b785c792802e7b275c420d84f3397668e9d49ab1cb52bd916b3b3ffcf09ad"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1bed815f3dc3d915c5c1e556c397c8667826fbc1b935d95b0ad680787896a358"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:96603a562b546632441926cd1293cfcb5b69f0b4159e6077f7c7dbdfb686af4d"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d76e8b13161a202d14c9584590c4df4d068c9567c99506497bdd67eaedf36403"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:e3f1e3f1a1751bb62b4a1b7f4e435afcdade6c17a4fd9b9d43607cebd242924a"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:76b36b3124f0223903609944a3c8bf28a599b2cc0ce0be60b45211c8e9be97f8"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:a2ece4af1f3c967a4390c284797ab595a9f1bc1130ef8b01828915a05a6ae684"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-musllinux_1_1_s390x.whl", hash = "sha256:16d330b3b9db87c3883e565340d292638a878236418b23cc8b9b11a054aaa887"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:42c89579f82e49db436b69c938ab3e1559e5a4409eb8639eb4143989bc390f2f"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-win32.whl", hash = "sha256:efd2fcf7e7b9d7ab16e6b7d54205beded0a9c8566cb30f09c1abe42b4e22bdcb"},
|
||||
{file = "aiohttp-3.8.6-cp311-cp311-win_amd64.whl", hash = "sha256:3b2ab182fc28e7a81f6c70bfbd829045d9480063f5ab06f6e601a3eddbbd49a0"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:fdee8405931b0615220e5ddf8cd7edd8592c606a8e4ca2a00704883c396e4479"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d25036d161c4fe2225d1abff2bd52c34ed0b1099f02c208cd34d8c05729882f0"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:5d791245a894be071d5ab04bbb4850534261a7d4fd363b094a7b9963e8cdbd31"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0cccd1de239afa866e4ce5c789b3032442f19c261c7d8a01183fd956b1935349"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1f13f60d78224f0dace220d8ab4ef1dbc37115eeeab8c06804fec11bec2bbd07"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8a9b5a0606faca4f6cc0d338359d6fa137104c337f489cd135bb7fbdbccb1e39"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:13da35c9ceb847732bf5c6c5781dcf4780e14392e5d3b3c689f6d22f8e15ae31"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:4d4cbe4ffa9d05f46a28252efc5941e0462792930caa370a6efaf491f412bc66"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-musllinux_1_1_ppc64le.whl", hash = "sha256:229852e147f44da0241954fc6cb910ba074e597f06789c867cb7fb0621e0ba7a"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-musllinux_1_1_s390x.whl", hash = "sha256:713103a8bdde61d13490adf47171a1039fd880113981e55401a0f7b42c37d071"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:45ad816b2c8e3b60b510f30dbd37fe74fd4a772248a52bb021f6fd65dff809b6"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-win32.whl", hash = "sha256:2b8d4e166e600dcfbff51919c7a3789ff6ca8b3ecce16e1d9c96d95dd569eb4c"},
|
||||
{file = "aiohttp-3.8.6-cp36-cp36m-win_amd64.whl", hash = "sha256:0912ed87fee967940aacc5306d3aa8ba3a459fcd12add0b407081fbefc931e53"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:e2a988a0c673c2e12084f5e6ba3392d76c75ddb8ebc6c7e9ead68248101cd446"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ebf3fd9f141700b510d4b190094db0ce37ac6361a6806c153c161dc6c041ccda"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:3161ce82ab85acd267c8f4b14aa226047a6bee1e4e6adb74b798bd42c6ae1f80"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d95fc1bf33a9a81469aa760617b5971331cdd74370d1214f0b3109272c0e1e3c"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c43ecfef7deaf0617cee936836518e7424ee12cb709883f2c9a1adda63cc460"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ca80e1b90a05a4f476547f904992ae81eda5c2c85c66ee4195bb8f9c5fb47f28"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:90c72ebb7cb3a08a7f40061079817133f502a160561d0675b0a6adf231382c92"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:bb54c54510e47a8c7c8e63454a6acc817519337b2b78606c4e840871a3e15349"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-musllinux_1_1_ppc64le.whl", hash = "sha256:de6a1c9f6803b90e20869e6b99c2c18cef5cc691363954c93cb9adeb26d9f3ae"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-musllinux_1_1_s390x.whl", hash = "sha256:a3628b6c7b880b181a3ae0a0683698513874df63783fd89de99b7b7539e3e8a8"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:fc37e9aef10a696a5a4474802930079ccfc14d9f9c10b4662169671ff034b7df"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-win32.whl", hash = "sha256:f8ef51e459eb2ad8e7a66c1d6440c808485840ad55ecc3cafefadea47d1b1ba2"},
|
||||
{file = "aiohttp-3.8.6-cp37-cp37m-win_amd64.whl", hash = "sha256:b2fe42e523be344124c6c8ef32a011444e869dc5f883c591ed87f84339de5976"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:9e2ee0ac5a1f5c7dd3197de309adfb99ac4617ff02b0603fd1e65b07dc772e4b"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:01770d8c04bd8db568abb636c1fdd4f7140b284b8b3e0b4584f070180c1e5c62"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:3c68330a59506254b556b99a91857428cab98b2f84061260a67865f7f52899f5"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:89341b2c19fb5eac30c341133ae2cc3544d40d9b1892749cdd25892bbc6ac951"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:71783b0b6455ac8f34b5ec99d83e686892c50498d5d00b8e56d47f41b38fbe04"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f628dbf3c91e12f4d6c8b3f092069567d8eb17814aebba3d7d60c149391aee3a"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b04691bc6601ef47c88f0255043df6f570ada1a9ebef99c34bd0b72866c217ae"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7ee912f7e78287516df155f69da575a0ba33b02dd7c1d6614dbc9463f43066e3"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:9c19b26acdd08dd239e0d3669a3dddafd600902e37881f13fbd8a53943079dbc"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:99c5ac4ad492b4a19fc132306cd57075c28446ec2ed970973bbf036bcda1bcc6"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:f0f03211fd14a6a0aed2997d4b1c013d49fb7b50eeb9ffdf5e51f23cfe2c77fa"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-musllinux_1_1_s390x.whl", hash = "sha256:8d399dade330c53b4106160f75f55407e9ae7505263ea86f2ccca6bfcbdb4921"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:ec4fd86658c6a8964d75426517dc01cbf840bbf32d055ce64a9e63a40fd7b771"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-win32.whl", hash = "sha256:33164093be11fcef3ce2571a0dccd9041c9a93fa3bde86569d7b03120d276c6f"},
|
||||
{file = "aiohttp-3.8.6-cp38-cp38-win_amd64.whl", hash = "sha256:bdf70bfe5a1414ba9afb9d49f0c912dc524cf60141102f3a11143ba3d291870f"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:d52d5dc7c6682b720280f9d9db41d36ebe4791622c842e258c9206232251ab2b"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:4ac39027011414dbd3d87f7edb31680e1f430834c8cef029f11c66dad0670aa5"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3f5c7ce535a1d2429a634310e308fb7d718905487257060e5d4598e29dc17f0b"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b30e963f9e0d52c28f284d554a9469af073030030cef8693106d918b2ca92f54"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:918810ef188f84152af6b938254911055a72e0f935b5fbc4c1a4ed0b0584aed1"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:002f23e6ea8d3dd8d149e569fd580c999232b5fbc601c48d55398fbc2e582e8c"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4fcf3eabd3fd1a5e6092d1242295fa37d0354b2eb2077e6eb670accad78e40e1"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:255ba9d6d5ff1a382bb9a578cd563605aa69bec845680e21c44afc2670607a95"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:d67f8baed00870aa390ea2590798766256f31dc5ed3ecc737debb6e97e2ede78"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:86f20cee0f0a317c76573b627b954c412ea766d6ada1a9fcf1b805763ae7feeb"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:39a312d0e991690ccc1a61f1e9e42daa519dcc34ad03eb6f826d94c1190190dd"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-musllinux_1_1_s390x.whl", hash = "sha256:e827d48cf802de06d9c935088c2924e3c7e7533377d66b6f31ed175c1620e05e"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:bd111d7fc5591ddf377a408ed9067045259ff2770f37e2d94e6478d0f3fc0c17"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-win32.whl", hash = "sha256:caf486ac1e689dda3502567eb89ffe02876546599bbf915ec94b1fa424eeffd4"},
|
||||
{file = "aiohttp-3.8.6-cp39-cp39-win_amd64.whl", hash = "sha256:3f0e27e5b733803333bb2371249f41cf42bae8884863e8e8965ec69bebe53132"},
|
||||
{file = "aiohttp-3.8.6.tar.gz", hash = "sha256:b0cf2a4501bff9330a8a5248b4ce951851e415bdcce9dc158e76cfd55e15085c"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
aiosignal = ">=1.1.2"
|
||||
async-timeout = {version = ">=4.0,<5.0", markers = "python_version < \"3.11\""}
|
||||
async-timeout = ">=4.0.0a3,<5.0"
|
||||
attrs = ">=17.3.0"
|
||||
charset-normalizer = ">=2.0,<4.0"
|
||||
frozenlist = ">=1.1.1"
|
||||
multidict = ">=4.5,<7.0"
|
||||
yarl = ">=1.0,<2.0"
|
||||
|
||||
[package.extras]
|
||||
speedups = ["Brotli", "aiodns", "brotlicffi"]
|
||||
speedups = ["Brotli", "aiodns", "cchardet"]
|
||||
|
||||
[[package]]
|
||||
name = "aiopg"
|
||||
@@ -2707,4 +2719,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "25ffa9ed98d890a3b85e6036792296a60bb705e8f9eaa1f07336501116a58756"
|
||||
content-hash = "0834e5cb69e5457741d4f476c3e49a4dc83598b5730685c8755da651b96ad3ec"
|
||||
|
||||
@@ -76,4 +76,3 @@ tokio-util.workspace = true
|
||||
rcgen.workspace = true
|
||||
rstest.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
|
||||
@@ -6,7 +6,6 @@ pub use link::LinkAuthError;
|
||||
use tokio_postgres::config::AuthKeys;
|
||||
|
||||
use crate::proxy::{handle_try_wake, retry_after, LatencyTimer};
|
||||
use crate::stream::Stream;
|
||||
use crate::{
|
||||
auth::{self, ClientCredentials},
|
||||
config::AuthenticationConfig,
|
||||
@@ -132,7 +131,7 @@ async fn auth_quirks_creds(
|
||||
api: &impl console::Api,
|
||||
extra: &ConsoleReqExtra<'_>,
|
||||
creds: &mut ClientCredentials<'_>,
|
||||
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
allow_cleartext: bool,
|
||||
config: &'static AuthenticationConfig,
|
||||
latency_timer: &mut LatencyTimer,
|
||||
@@ -166,7 +165,7 @@ async fn auth_quirks(
|
||||
api: &impl console::Api,
|
||||
extra: &ConsoleReqExtra<'_>,
|
||||
creds: &mut ClientCredentials<'_>,
|
||||
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
allow_cleartext: bool,
|
||||
config: &'static AuthenticationConfig,
|
||||
latency_timer: &mut LatencyTimer,
|
||||
@@ -242,7 +241,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
pub async fn authenticate(
|
||||
&mut self,
|
||||
extra: &ConsoleReqExtra<'_>,
|
||||
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
allow_cleartext: bool,
|
||||
config: &'static AuthenticationConfig,
|
||||
latency_timer: &mut LatencyTimer,
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::{
|
||||
console::{self, AuthInfo, ConsoleReqExtra},
|
||||
proxy::LatencyTimer,
|
||||
sasl, scram,
|
||||
stream::{PqStream, Stream},
|
||||
stream::PqStream,
|
||||
};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, warn};
|
||||
@@ -15,7 +15,7 @@ pub(super) async fn authenticate(
|
||||
api: &impl console::Api,
|
||||
extra: &ConsoleReqExtra<'_>,
|
||||
creds: &ClientCredentials<'_>,
|
||||
client: &mut PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
config: &'static AuthenticationConfig,
|
||||
latency_timer: &mut LatencyTimer,
|
||||
) -> auth::Result<AuthSuccess<ComputeCredentials>> {
|
||||
|
||||
@@ -2,7 +2,7 @@ use super::{AuthSuccess, ComputeCredentials};
|
||||
use crate::{
|
||||
auth::{self, AuthFlow, ClientCredentials},
|
||||
proxy::LatencyTimer,
|
||||
stream::{self, Stream},
|
||||
stream,
|
||||
};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, warn};
|
||||
@@ -12,7 +12,7 @@ use tracing::{info, warn};
|
||||
/// These properties are benefical for serverless JS workers, so we
|
||||
/// use this mechanism for websocket connections.
|
||||
pub async fn cleartext_hack(
|
||||
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
latency_timer: &mut LatencyTimer,
|
||||
) -> auth::Result<AuthSuccess<ComputeCredentials>> {
|
||||
warn!("cleartext auth flow override is enabled, proceeding");
|
||||
@@ -37,7 +37,7 @@ pub async fn cleartext_hack(
|
||||
/// Very similar to [`cleartext_hack`], but there's a specific password format.
|
||||
pub async fn password_hack(
|
||||
creds: &mut ClientCredentials<'_>,
|
||||
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
latency_timer: &mut LatencyTimer,
|
||||
) -> auth::Result<AuthSuccess<ComputeCredentials>> {
|
||||
warn!("project not specified, resorting to the password hack auth flow");
|
||||
|
||||
@@ -1,21 +1,16 @@
|
||||
//! Main authentication flow.
|
||||
|
||||
use super::{AuthErrorImpl, PasswordHackPayload};
|
||||
use crate::{
|
||||
config::TlsServerEndPoint,
|
||||
sasl, scram,
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
use crate::{sasl, scram, stream::PqStream};
|
||||
use pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
|
||||
use std::io;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::info;
|
||||
|
||||
/// Every authentication selector is supposed to implement this trait.
|
||||
pub trait AuthMethod {
|
||||
/// Any authentication selector should provide initial backend message
|
||||
/// containing auth method name and parameters, e.g. md5 salt.
|
||||
fn first_message(&self, channel_binding: bool) -> BeMessage<'_>;
|
||||
fn first_message(&self) -> BeMessage<'_>;
|
||||
}
|
||||
|
||||
/// Initial state of [`AuthFlow`].
|
||||
@@ -26,14 +21,8 @@ pub struct Scram<'a>(pub &'a scram::ServerSecret);
|
||||
|
||||
impl AuthMethod for Scram<'_> {
|
||||
#[inline(always)]
|
||||
fn first_message(&self, channel_binding: bool) -> BeMessage<'_> {
|
||||
if channel_binding {
|
||||
Be::AuthenticationSasl(BeAuthenticationSaslMessage::Methods(scram::METHODS))
|
||||
} else {
|
||||
Be::AuthenticationSasl(BeAuthenticationSaslMessage::Methods(
|
||||
scram::METHODS_WITHOUT_PLUS,
|
||||
))
|
||||
}
|
||||
fn first_message(&self) -> BeMessage<'_> {
|
||||
Be::AuthenticationSasl(BeAuthenticationSaslMessage::Methods(scram::METHODS))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,7 +32,7 @@ pub struct PasswordHack;
|
||||
|
||||
impl AuthMethod for PasswordHack {
|
||||
#[inline(always)]
|
||||
fn first_message(&self, _channel_binding: bool) -> BeMessage<'_> {
|
||||
fn first_message(&self) -> BeMessage<'_> {
|
||||
Be::AuthenticationCleartextPassword
|
||||
}
|
||||
}
|
||||
@@ -54,44 +43,37 @@ pub struct CleartextPassword;
|
||||
|
||||
impl AuthMethod for CleartextPassword {
|
||||
#[inline(always)]
|
||||
fn first_message(&self, _channel_binding: bool) -> BeMessage<'_> {
|
||||
fn first_message(&self) -> BeMessage<'_> {
|
||||
Be::AuthenticationCleartextPassword
|
||||
}
|
||||
}
|
||||
|
||||
/// This wrapper for [`PqStream`] performs client authentication.
|
||||
#[must_use]
|
||||
pub struct AuthFlow<'a, S, State> {
|
||||
pub struct AuthFlow<'a, Stream, State> {
|
||||
/// The underlying stream which implements libpq's protocol.
|
||||
stream: &'a mut PqStream<Stream<S>>,
|
||||
stream: &'a mut PqStream<Stream>,
|
||||
/// State might contain ancillary data (see [`Self::begin`]).
|
||||
state: State,
|
||||
tls_server_end_point: TlsServerEndPoint,
|
||||
}
|
||||
|
||||
/// Initial state of the stream wrapper.
|
||||
impl<'a, S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'a, S, Begin> {
|
||||
impl<'a, S: AsyncWrite + Unpin> AuthFlow<'a, S, Begin> {
|
||||
/// Create a new wrapper for client authentication.
|
||||
pub fn new(stream: &'a mut PqStream<Stream<S>>) -> Self {
|
||||
let tls_server_end_point = stream.get_ref().tls_server_end_point();
|
||||
|
||||
pub fn new(stream: &'a mut PqStream<S>) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
state: Begin,
|
||||
tls_server_end_point,
|
||||
}
|
||||
}
|
||||
|
||||
/// Move to the next step by sending auth method's name & params to client.
|
||||
pub async fn begin<M: AuthMethod>(self, method: M) -> io::Result<AuthFlow<'a, S, M>> {
|
||||
self.stream
|
||||
.write_message(&method.first_message(self.tls_server_end_point.supported()))
|
||||
.await?;
|
||||
self.stream.write_message(&method.first_message()).await?;
|
||||
|
||||
Ok(AuthFlow {
|
||||
stream: self.stream,
|
||||
state: method,
|
||||
tls_server_end_point: self.tls_server_end_point,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -141,15 +123,9 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
|
||||
return Err(super::AuthError::bad_auth_method(sasl.method));
|
||||
}
|
||||
|
||||
info!("client chooses {}", sasl.method);
|
||||
|
||||
let secret = self.state.0;
|
||||
let outcome = sasl::SaslStream::new(self.stream, sasl.message)
|
||||
.authenticate(scram::Exchange::new(
|
||||
secret,
|
||||
rand::random,
|
||||
self.tls_server_end_point,
|
||||
))
|
||||
.authenticate(scram::Exchange::new(secret, rand::random, None))
|
||||
.await?;
|
||||
|
||||
Ok(outcome)
|
||||
|
||||
@@ -6,8 +6,6 @@
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
|
||||
use futures::future::Either;
|
||||
use itertools::Itertools;
|
||||
use proxy::config::TlsServerEndPoint;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
@@ -67,7 +65,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let destination: String = args.get_one::<String>("dest").unwrap().parse()?;
|
||||
|
||||
// Configure TLS
|
||||
let (tls_config, tls_server_end_point): (Arc<rustls::ServerConfig>, TlsServerEndPoint) = match (
|
||||
let tls_config: Arc<rustls::ServerConfig> = match (
|
||||
args.get_one::<String>("tls-key"),
|
||||
args.get_one::<String>("tls-cert"),
|
||||
) {
|
||||
@@ -91,22 +89,16 @@ async fn main() -> anyhow::Result<()> {
|
||||
))?
|
||||
.into_iter()
|
||||
.map(rustls::Certificate)
|
||||
.collect_vec()
|
||||
.collect()
|
||||
};
|
||||
|
||||
// needed for channel bindings
|
||||
let first_cert = cert_chain.first().context("missing certificate")?;
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let tls_config = rustls::ServerConfig::builder()
|
||||
rustls::ServerConfig::builder()
|
||||
.with_safe_default_cipher_suites()
|
||||
.with_safe_default_kx_groups()
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(cert_chain, key)?
|
||||
.into();
|
||||
|
||||
(tls_config, tls_server_end_point)
|
||||
.into()
|
||||
}
|
||||
_ => bail!("tls-key and tls-cert must be specified"),
|
||||
};
|
||||
@@ -121,7 +113,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
let main = tokio::spawn(task_main(
|
||||
Arc::new(destination),
|
||||
tls_config,
|
||||
tls_server_end_point,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
));
|
||||
@@ -143,7 +134,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
async fn task_main(
|
||||
dest_suffix: Arc<String>,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
tls_server_end_point: TlsServerEndPoint,
|
||||
listener: tokio::net::TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -169,7 +159,7 @@ async fn task_main(
|
||||
.context("failed to set socket option")?;
|
||||
|
||||
info!(%peer_addr, "serving");
|
||||
handle_client(dest_suffix, tls_config, tls_server_end_point, socket).await
|
||||
handle_client(dest_suffix, tls_config, socket).await
|
||||
}
|
||||
.unwrap_or_else(|e| {
|
||||
// Acknowledge that the task has finished with an error.
|
||||
@@ -217,7 +207,6 @@ const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmod
|
||||
async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
raw_stream: S,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
tls_server_end_point: TlsServerEndPoint,
|
||||
) -> anyhow::Result<Stream<S>> {
|
||||
let mut stream = PqStream::new(Stream::from_raw(raw_stream));
|
||||
|
||||
@@ -242,11 +231,7 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
if !read_buf.is_empty() {
|
||||
bail!("data is sent before server replied with EncryptionResponse");
|
||||
}
|
||||
|
||||
Ok(Stream::Tls {
|
||||
tls: Box::new(raw.upgrade(tls_config).await?),
|
||||
tls_server_end_point,
|
||||
})
|
||||
Ok(raw.upgrade(tls_config).await?)
|
||||
}
|
||||
unexpected => {
|
||||
info!(
|
||||
@@ -261,10 +246,9 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
async fn handle_client(
|
||||
dest_suffix: Arc<String>,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
tls_server_end_point: TlsServerEndPoint,
|
||||
stream: impl AsyncRead + AsyncWrite + Unpin,
|
||||
) -> anyhow::Result<()> {
|
||||
let tls_stream = ssl_handshake(stream, tls_config, tls_server_end_point).await?;
|
||||
let tls_stream = ssl_handshake(stream, tls_config).await?;
|
||||
|
||||
// Cut off first part of the SNI domain
|
||||
// We receive required destination details in the format of
|
||||
|
||||
@@ -1,15 +1,12 @@
|
||||
use crate::auth;
|
||||
use anyhow::{bail, ensure, Context, Ok};
|
||||
use rustls::{sign, Certificate, PrivateKey};
|
||||
use sha2::{Digest, Sha256};
|
||||
use rustls::sign;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::{error, info};
|
||||
use x509_parser::oid_registry;
|
||||
|
||||
pub struct ProxyConfig {
|
||||
pub tls_config: Option<TlsConfig>,
|
||||
@@ -30,7 +27,6 @@ pub struct MetricCollectionConfig {
|
||||
pub struct TlsConfig {
|
||||
pub config: Arc<rustls::ServerConfig>,
|
||||
pub common_names: Option<HashSet<String>>,
|
||||
pub cert_resolver: Arc<CertResolver>,
|
||||
}
|
||||
|
||||
pub struct HttpConfig {
|
||||
@@ -56,7 +52,7 @@ pub fn configure_tls(
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
|
||||
// add default certificate
|
||||
cert_resolver.add_cert_path(key_path, cert_path, true)?;
|
||||
cert_resolver.add_cert(key_path, cert_path, true)?;
|
||||
|
||||
// add extra certificates
|
||||
if let Some(certs_dir) = certs_dir {
|
||||
@@ -68,7 +64,7 @@ pub fn configure_tls(
|
||||
let key_path = path.join("tls.key");
|
||||
let cert_path = path.join("tls.crt");
|
||||
if key_path.exists() && cert_path.exists() {
|
||||
cert_resolver.add_cert_path(
|
||||
cert_resolver.add_cert(
|
||||
&key_path.to_string_lossy(),
|
||||
&cert_path.to_string_lossy(),
|
||||
false,
|
||||
@@ -80,97 +76,35 @@ pub fn configure_tls(
|
||||
|
||||
let common_names = cert_resolver.get_common_names();
|
||||
|
||||
let cert_resolver = Arc::new(cert_resolver);
|
||||
|
||||
let config = rustls::ServerConfig::builder()
|
||||
.with_safe_default_cipher_suites()
|
||||
.with_safe_default_kx_groups()
|
||||
// allow TLS 1.2 to be compatible with older client libraries
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(cert_resolver.clone())
|
||||
.with_cert_resolver(Arc::new(cert_resolver))
|
||||
.into();
|
||||
|
||||
Ok(TlsConfig {
|
||||
config,
|
||||
common_names: Some(common_names),
|
||||
cert_resolver,
|
||||
})
|
||||
}
|
||||
|
||||
/// Channel binding parameter
|
||||
///
|
||||
/// <https://www.rfc-editor.org/rfc/rfc5929#section-4>
|
||||
/// Description: The hash of the TLS server's certificate as it
|
||||
/// appears, octet for octet, in the server's Certificate message. Note
|
||||
/// that the Certificate message contains a certificate_list, in which
|
||||
/// the first element is the server's certificate.
|
||||
///
|
||||
/// The hash function is to be selected as follows:
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses a single hash
|
||||
/// function, and that hash function is either MD5 or SHA-1, then use SHA-256;
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses a single hash
|
||||
/// function and that hash function neither MD5 nor SHA-1, then use
|
||||
/// the hash function associated with the certificate's
|
||||
/// signatureAlgorithm;
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses no hash functions or
|
||||
/// uses multiple hash functions, then this channel binding type's
|
||||
/// channel bindings are undefined at this time (updates to is channel
|
||||
/// binding type may occur to address this issue if it ever arises).
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum TlsServerEndPoint {
|
||||
Sha256([u8; 32]),
|
||||
Undefined,
|
||||
}
|
||||
|
||||
impl TlsServerEndPoint {
|
||||
pub fn new(cert: &Certificate) -> anyhow::Result<Self> {
|
||||
let sha256_oids = [
|
||||
// I'm explicitly not adding MD5 or SHA1 here... They're bad.
|
||||
oid_registry::OID_SIG_ECDSA_WITH_SHA256,
|
||||
oid_registry::OID_PKCS1_SHA256WITHRSA,
|
||||
];
|
||||
|
||||
let pem = x509_parser::parse_x509_certificate(&cert.0)
|
||||
.context("Failed to parse PEM object from cerficiate")?
|
||||
.1;
|
||||
|
||||
info!(subject = %pem.subject, "parsing TLS certificate");
|
||||
|
||||
let reg = oid_registry::OidRegistry::default().with_all_crypto();
|
||||
let oid = pem.signature_algorithm.oid();
|
||||
let alg = reg.get(oid);
|
||||
if sha256_oids.contains(oid) {
|
||||
let tls_server_end_point: [u8; 32] =
|
||||
Sha256::new().chain_update(&cert.0).finalize().into();
|
||||
info!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), tls_server_end_point = %base64::encode(tls_server_end_point), "determined channel binding");
|
||||
Ok(Self::Sha256(tls_server_end_point))
|
||||
} else {
|
||||
error!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), "unknown channel binding");
|
||||
Ok(Self::Undefined)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn supported(&self) -> bool {
|
||||
!matches!(self, TlsServerEndPoint::Undefined)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct CertResolver {
|
||||
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
struct CertResolver {
|
||||
certs: HashMap<String, Arc<rustls::sign::CertifiedKey>>,
|
||||
default: Option<Arc<rustls::sign::CertifiedKey>>,
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
certs: HashMap::new(),
|
||||
default: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn add_cert_path(
|
||||
fn add_cert(
|
||||
&mut self,
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
@@ -186,65 +120,57 @@ impl CertResolver {
|
||||
keys.pop().map(rustls::PrivateKey).unwrap()
|
||||
};
|
||||
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.with_context(|| {
|
||||
format!(
|
||||
.context(format!(
|
||||
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
|
||||
)
|
||||
})?
|
||||
))?
|
||||
.into_iter()
|
||||
.map(rustls::Certificate)
|
||||
.collect()
|
||||
};
|
||||
|
||||
self.add_cert(priv_key, cert_chain, is_default)
|
||||
}
|
||||
let common_name = {
|
||||
let pem = x509_parser::pem::parse_x509_pem(&cert_chain_bytes)
|
||||
.context(format!(
|
||||
"Failed to parse PEM object from bytes from file at '{cert_path}'."
|
||||
))?
|
||||
.1;
|
||||
let common_name = pem.parse_x509()?.subject().to_string();
|
||||
|
||||
pub fn add_cert(
|
||||
&mut self,
|
||||
priv_key: PrivateKey,
|
||||
cert_chain: Vec<Certificate>,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
let pem = x509_parser::parse_x509_certificate(&first_cert.0)
|
||||
.context("Failed to parse PEM object from cerficiate")?
|
||||
.1;
|
||||
|
||||
let common_name = pem.subject().to_string();
|
||||
|
||||
// We only use non-wildcard certificates in link proxy so it seems okay to treat them the same as
|
||||
// wildcard ones as we don't use SNI there. That treatment only affects certificate selection, so
|
||||
// verify-full will still check wildcard match. Old coding here just ignored non-wildcard common names
|
||||
// and passed None instead, which blows up number of cases downstream code should handle. Proper coding
|
||||
// here should better avoid Option for common_names, and do wildcard-based certificate selection instead
|
||||
// of cutting off '*.' parts.
|
||||
let common_name = if common_name.starts_with("CN=*.") {
|
||||
common_name.strip_prefix("CN=*.").map(|s| s.to_string())
|
||||
} else {
|
||||
common_name.strip_prefix("CN=").map(|s| s.to_string())
|
||||
// We only use non-wildcard certificates in link proxy so it seems okay to treat them the same as
|
||||
// wildcard ones as we don't use SNI there. That treatment only affects certificate selection, so
|
||||
// verify-full will still check wildcard match. Old coding here just ignored non-wildcard common names
|
||||
// and passed None instead, which blows up number of cases downstream code should handle. Proper coding
|
||||
// here should better avoid Option for common_names, and do wildcard-based certificate selection instead
|
||||
// of cutting off '*.' parts.
|
||||
if common_name.starts_with("CN=*.") {
|
||||
common_name.strip_prefix("CN=*.").map(|s| s.to_string())
|
||||
} else {
|
||||
common_name.strip_prefix("CN=").map(|s| s.to_string())
|
||||
}
|
||||
}
|
||||
.context("Failed to parse common name from certificate")?;
|
||||
.context(format!(
|
||||
"Failed to parse common name from certificate at '{cert_path}'."
|
||||
))?;
|
||||
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
if is_default {
|
||||
self.default = Some((cert.clone(), tls_server_end_point));
|
||||
self.default = Some(cert.clone());
|
||||
}
|
||||
|
||||
self.certs.insert(common_name, (cert, tls_server_end_point));
|
||||
self.certs.insert(common_name, cert);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_common_names(&self) -> HashSet<String> {
|
||||
fn get_common_names(&self) -> HashSet<String> {
|
||||
self.certs.keys().map(|s| s.to_string()).collect()
|
||||
}
|
||||
}
|
||||
@@ -252,24 +178,15 @@ impl CertResolver {
|
||||
impl rustls::server::ResolvesServerCert for CertResolver {
|
||||
fn resolve(
|
||||
&self,
|
||||
client_hello: rustls::server::ClientHello,
|
||||
_client_hello: rustls::server::ClientHello,
|
||||
) -> Option<Arc<rustls::sign::CertifiedKey>> {
|
||||
self.resolve(client_hello.server_name()).map(|x| x.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
pub fn resolve(
|
||||
&self,
|
||||
server_name: Option<&str>,
|
||||
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
|
||||
// loop here and cut off more and more subdomains until we find
|
||||
// a match to get a proper wildcard support. OTOH, we now do not
|
||||
// use nested domains, so keep this simple for now.
|
||||
//
|
||||
// With the current coding foo.com will match *.foo.com and that
|
||||
// repeats behavior of the old code.
|
||||
if let Some(mut sni_name) = server_name {
|
||||
if let Some(mut sni_name) = _client_hello.server_name() {
|
||||
loop {
|
||||
if let Some(cert) = self.certs.get(sni_name) {
|
||||
return Some(cert.clone());
|
||||
|
||||
@@ -470,17 +470,7 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
if !read_buf.is_empty() {
|
||||
bail!("data is sent before server replied with EncryptionResponse");
|
||||
}
|
||||
let tls_stream = raw.upgrade(tls.to_server_config()).await?;
|
||||
|
||||
let (_, tls_server_end_point) = tls
|
||||
.cert_resolver
|
||||
.resolve(tls_stream.get_ref().1.server_name())
|
||||
.context("missing certificate")?;
|
||||
|
||||
stream = PqStream::new(Stream::Tls {
|
||||
tls: Box::new(tls_stream),
|
||||
tls_server_end_point,
|
||||
});
|
||||
stream = PqStream::new(raw.upgrade(tls.to_server_config()).await?);
|
||||
}
|
||||
}
|
||||
_ => bail!(ERR_PROTO_VIOLATION),
|
||||
@@ -885,7 +875,7 @@ pub async fn proxy_pass(
|
||||
/// Thin connection context.
|
||||
struct Client<'a, S> {
|
||||
/// The underlying libpq protocol stream.
|
||||
stream: PqStream<Stream<S>>,
|
||||
stream: PqStream<S>,
|
||||
/// Client credentials that we care about.
|
||||
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
|
||||
/// KV-dictionary with PostgreSQL connection params.
|
||||
@@ -899,7 +889,7 @@ struct Client<'a, S> {
|
||||
impl<'a, S> Client<'a, S> {
|
||||
/// Construct a new connection context.
|
||||
fn new(
|
||||
stream: PqStream<Stream<S>>,
|
||||
stream: PqStream<S>,
|
||||
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
|
||||
params: &'a StartupMessageParams,
|
||||
session_id: uuid::Uuid,
|
||||
|
||||
@@ -1,23 +1,19 @@
|
||||
//! A group of high-level tests for connection establishing logic and auth.
|
||||
|
||||
mod mitm;
|
||||
|
||||
//!
|
||||
use super::*;
|
||||
use crate::auth::backend::TestBackend;
|
||||
use crate::auth::ClientCredentials;
|
||||
use crate::config::CertResolver;
|
||||
use crate::console::{CachedNodeInfo, NodeInfo};
|
||||
use crate::{auth, http, sasl, scram};
|
||||
use async_trait::async_trait;
|
||||
use rstest::rstest;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tokio_postgres::tls::{MakeTlsConnect, NoTls};
|
||||
use tokio_postgres_rustls::{MakeRustlsConnect, RustlsStream};
|
||||
use tokio_postgres_rustls::MakeRustlsConnect;
|
||||
|
||||
/// Generate a set of TLS certificates: CA + server.
|
||||
fn generate_certs(
|
||||
hostname: &str,
|
||||
common_name: &str,
|
||||
) -> anyhow::Result<(rustls::Certificate, rustls::Certificate, rustls::PrivateKey)> {
|
||||
let ca = rcgen::Certificate::from_params({
|
||||
let mut params = rcgen::CertificateParams::default();
|
||||
@@ -25,15 +21,7 @@ fn generate_certs(
|
||||
params
|
||||
})?;
|
||||
|
||||
let cert = rcgen::Certificate::from_params({
|
||||
let mut params = rcgen::CertificateParams::new(vec![hostname.into()]);
|
||||
params.distinguished_name = rcgen::DistinguishedName::new();
|
||||
params
|
||||
.distinguished_name
|
||||
.push(rcgen::DnType::CommonName, common_name);
|
||||
params
|
||||
})?;
|
||||
|
||||
let cert = rcgen::generate_simple_self_signed(vec![hostname.into()])?;
|
||||
Ok((
|
||||
rustls::Certificate(ca.serialize_der()?),
|
||||
rustls::Certificate(cert.serialize_der_with_signer(&ca)?),
|
||||
@@ -49,14 +37,7 @@ struct ClientConfig<'a> {
|
||||
impl ClientConfig<'_> {
|
||||
fn make_tls_connect<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
|
||||
self,
|
||||
) -> anyhow::Result<
|
||||
impl tokio_postgres::tls::TlsConnect<
|
||||
S,
|
||||
Error = impl std::fmt::Debug,
|
||||
Future = impl Send,
|
||||
Stream = RustlsStream<S>,
|
||||
>,
|
||||
> {
|
||||
) -> anyhow::Result<impl tokio_postgres::tls::TlsConnect<S>> {
|
||||
let mut mk = MakeRustlsConnect::new(self.config);
|
||||
let tls = MakeTlsConnect::<S>::make_tls_connect(&mut mk, self.hostname)?;
|
||||
Ok(tls)
|
||||
@@ -68,24 +49,20 @@ fn generate_tls_config<'a>(
|
||||
hostname: &'a str,
|
||||
common_name: &'a str,
|
||||
) -> anyhow::Result<(ClientConfig<'a>, TlsConfig)> {
|
||||
let (ca, cert, key) = generate_certs(hostname, common_name)?;
|
||||
let (ca, cert, key) = generate_certs(hostname)?;
|
||||
|
||||
let tls_config = {
|
||||
let config = rustls::ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(vec![cert.clone()], key.clone())?
|
||||
.with_single_cert(vec![cert], key)?
|
||||
.into();
|
||||
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
cert_resolver.add_cert(key, vec![cert], true)?;
|
||||
|
||||
let common_names = Some(cert_resolver.get_common_names());
|
||||
let common_names = Some([common_name.to_owned()].iter().cloned().collect());
|
||||
|
||||
TlsConfig {
|
||||
config,
|
||||
common_names,
|
||||
cert_resolver: Arc::new(cert_resolver),
|
||||
}
|
||||
};
|
||||
|
||||
@@ -276,7 +253,6 @@ async fn scram_auth_good(#[case] password: &str) -> anyhow::Result<()> {
|
||||
));
|
||||
|
||||
let (_client, _conn) = tokio_postgres::Config::new()
|
||||
.channel_binding(tokio_postgres::config::ChannelBinding::Require)
|
||||
.user("user")
|
||||
.dbname("db")
|
||||
.password(password)
|
||||
@@ -287,30 +263,6 @@ async fn scram_auth_good(#[case] password: &str) -> anyhow::Result<()> {
|
||||
proxy.await?
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
|
||||
let (client, server) = tokio::io::duplex(1024);
|
||||
|
||||
let (client_config, server_config) =
|
||||
generate_tls_config("generic-project-name.localhost", "localhost")?;
|
||||
let proxy = tokio::spawn(dummy_proxy(
|
||||
client,
|
||||
Some(server_config),
|
||||
Scram::new("password")?,
|
||||
));
|
||||
|
||||
let (_client, _conn) = tokio_postgres::Config::new()
|
||||
.channel_binding(tokio_postgres::config::ChannelBinding::Disable)
|
||||
.user("user")
|
||||
.dbname("db")
|
||||
.password("password")
|
||||
.ssl_mode(SslMode::Require)
|
||||
.connect_raw(server, client_config.make_tls_connect()?)
|
||||
.await?;
|
||||
|
||||
proxy.await?
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn scram_auth_mock() -> anyhow::Result<()> {
|
||||
let (client, server) = tokio::io::duplex(1024);
|
||||
|
||||
@@ -1,257 +0,0 @@
|
||||
//! Man-in-the-middle tests
|
||||
//!
|
||||
//! Channel binding should prevent a proxy server
|
||||
//! - that has access to create valid certificates -
|
||||
//! from controlling the TLS connection.
|
||||
|
||||
use std::fmt::Debug;
|
||||
|
||||
use super::*;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use postgres_protocol::message::frontend;
|
||||
use tokio::io::{AsyncReadExt, DuplexStream};
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tokio_postgres::tls::TlsConnect;
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
enum Intercept {
|
||||
None,
|
||||
Methods,
|
||||
SASLResponse,
|
||||
}
|
||||
|
||||
async fn proxy_mitm(
|
||||
intercept: Intercept,
|
||||
) -> (DuplexStream, DuplexStream, ClientConfig<'static>, TlsConfig) {
|
||||
let (end_server1, client1) = tokio::io::duplex(1024);
|
||||
let (server2, end_client2) = tokio::io::duplex(1024);
|
||||
|
||||
let (client_config1, server_config1) =
|
||||
generate_tls_config("generic-project-name.localhost", "localhost").unwrap();
|
||||
let (client_config2, server_config2) =
|
||||
generate_tls_config("generic-project-name.localhost", "localhost").unwrap();
|
||||
|
||||
tokio::spawn(async move {
|
||||
// begin handshake with end_server
|
||||
let end_server = connect_tls(server2, client_config2.make_tls_connect().unwrap()).await;
|
||||
// process handshake with end_client
|
||||
let (end_client, startup) =
|
||||
handshake(client1, Some(&server_config1), &CancelMap::default())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let mut end_server = tokio_util::codec::Framed::new(end_server, PgFrame);
|
||||
let (end_client, buf) = end_client.framed.into_inner();
|
||||
assert!(buf.is_empty());
|
||||
let mut end_client = tokio_util::codec::Framed::new(end_client, PgFrame);
|
||||
|
||||
// give the end_server the startup parameters
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::startup_message(startup.iter(), &mut buf).unwrap();
|
||||
end_server.send(buf.freeze()).await.unwrap();
|
||||
|
||||
// proxy messages between end_client and end_server
|
||||
loop {
|
||||
tokio::select! {
|
||||
message = end_server.next() => {
|
||||
match message {
|
||||
Some(Ok(message)) => {
|
||||
// intercept SASL and return only SCRAM-SHA-256 ;)
|
||||
if matches!(intercept, Intercept::Methods) && message.starts_with(b"R") && message[5..].starts_with(&[0,0,0,10]) {
|
||||
end_client.send(Bytes::from_static(b"R\0\0\0\x17\0\0\0\x0aSCRAM-SHA-256\0\0")).await.unwrap();
|
||||
continue;
|
||||
}
|
||||
end_client.send(message).await.unwrap()
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
message = end_client.next() => {
|
||||
match message {
|
||||
Some(Ok(message)) => {
|
||||
// intercept SASL response and return SCRAM-SHA-256 with no channel binding ;)
|
||||
if matches!(intercept, Intercept::SASLResponse) && message.starts_with(b"p") && message[5..].starts_with(b"SCRAM-SHA-256-PLUS\0") {
|
||||
let sasl_message = &message[1+4+19+4..];
|
||||
let mut new_message = b"n,,".to_vec();
|
||||
new_message.extend_from_slice(sasl_message.strip_prefix(b"p=tls-server-end-point,,").unwrap());
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::sasl_initial_response("SCRAM-SHA-256", &new_message, &mut buf).unwrap();
|
||||
|
||||
end_server.send(buf.freeze()).await.unwrap();
|
||||
continue;
|
||||
}
|
||||
end_server.send(message).await.unwrap()
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
else => { break }
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(end_server1, end_client2, client_config1, server_config2)
|
||||
}
|
||||
|
||||
/// taken from tokio-postgres
|
||||
pub async fn connect_tls<S, T>(mut stream: S, tls: T) -> T::Stream
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
T: TlsConnect<S>,
|
||||
T::Error: Debug,
|
||||
{
|
||||
let mut buf = BytesMut::new();
|
||||
frontend::ssl_request(&mut buf);
|
||||
stream.write_all(&buf).await.unwrap();
|
||||
|
||||
let mut buf = [0];
|
||||
stream.read_exact(&mut buf).await.unwrap();
|
||||
|
||||
if buf[0] != b'S' {
|
||||
panic!("ssl not supported by server");
|
||||
}
|
||||
|
||||
tls.connect(stream).await.unwrap()
|
||||
}
|
||||
|
||||
struct PgFrame;
|
||||
impl Decoder for PgFrame {
|
||||
type Item = Bytes;
|
||||
type Error = io::Error;
|
||||
|
||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
if src.len() < 5 {
|
||||
src.reserve(5 - src.len());
|
||||
return Ok(None);
|
||||
}
|
||||
let len = u32::from_be_bytes(src[1..5].try_into().unwrap()) as usize + 1;
|
||||
if src.len() < len {
|
||||
src.reserve(len - src.len());
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(src.split_to(len).freeze()))
|
||||
}
|
||||
}
|
||||
impl Encoder<Bytes> for PgFrame {
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
dst.extend_from_slice(&item);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// If the client doesn't support channel bindings, it can be exploited.
|
||||
#[tokio::test]
|
||||
async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
|
||||
let (server, client, client_config, server_config) = proxy_mitm(Intercept::None).await;
|
||||
let proxy = tokio::spawn(dummy_proxy(
|
||||
client,
|
||||
Some(server_config),
|
||||
Scram::new("password")?,
|
||||
));
|
||||
|
||||
let _client_err = tokio_postgres::Config::new()
|
||||
.channel_binding(tokio_postgres::config::ChannelBinding::Disable)
|
||||
.user("user")
|
||||
.dbname("db")
|
||||
.password("password")
|
||||
.ssl_mode(SslMode::Require)
|
||||
.connect_raw(server, client_config.make_tls_connect()?)
|
||||
.await?;
|
||||
|
||||
proxy.await?
|
||||
}
|
||||
|
||||
/// If the client chooses SCRAM-PLUS, it will fail
|
||||
#[tokio::test]
|
||||
async fn scram_auth_prefer_channel_binding() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::None,
|
||||
tokio_postgres::config::ChannelBinding::Prefer,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// If the MITM pretends like SCRAM-PLUS isn't available, but the client supports it, it will fail
|
||||
#[tokio::test]
|
||||
async fn scram_auth_prefer_channel_binding_intercept() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::Methods,
|
||||
tokio_postgres::config::ChannelBinding::Prefer,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// If the MITM pretends like the client doesn't support channel bindings, it will fail
|
||||
#[tokio::test]
|
||||
async fn scram_auth_prefer_channel_binding_intercept_response() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::SASLResponse,
|
||||
tokio_postgres::config::ChannelBinding::Prefer,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// If the client chooses SCRAM-PLUS, it will fail
|
||||
#[tokio::test]
|
||||
async fn scram_auth_require_channel_binding() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::None,
|
||||
tokio_postgres::config::ChannelBinding::Require,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// If the client requires SCRAM-PLUS, and it is spoofed to remove SCRAM-PLUS, it will fail
|
||||
#[tokio::test]
|
||||
async fn scram_auth_require_channel_binding_intercept() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::Methods,
|
||||
tokio_postgres::config::ChannelBinding::Require,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// If the client requires SCRAM-PLUS, and it is spoofed to remove SCRAM-PLUS, it will fail
|
||||
#[tokio::test]
|
||||
async fn scram_auth_require_channel_binding_intercept_response() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::SASLResponse,
|
||||
tokio_postgres::config::ChannelBinding::Require,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn connect_failure(
|
||||
intercept: Intercept,
|
||||
channel_binding: tokio_postgres::config::ChannelBinding,
|
||||
) -> anyhow::Result<()> {
|
||||
let (server, client, client_config, server_config) = proxy_mitm(intercept).await;
|
||||
let proxy = tokio::spawn(dummy_proxy(
|
||||
client,
|
||||
Some(server_config),
|
||||
Scram::new("password")?,
|
||||
));
|
||||
|
||||
let _client_err = tokio_postgres::Config::new()
|
||||
.channel_binding(channel_binding)
|
||||
.user("user")
|
||||
.dbname("db")
|
||||
.password("password")
|
||||
.ssl_mode(SslMode::Require)
|
||||
.connect_raw(server, client_config.make_tls_connect()?)
|
||||
.await
|
||||
.err()
|
||||
.context("client shouldn't be able to connect")?;
|
||||
|
||||
let _server_err = proxy
|
||||
.await?
|
||||
.err()
|
||||
.context("server shouldn't accept client")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -36,9 +36,9 @@ impl<'a> ChannelBinding<&'a str> {
|
||||
|
||||
impl<T: std::fmt::Display> ChannelBinding<T> {
|
||||
/// Encode channel binding data as base64 for subsequent checks.
|
||||
pub fn encode<'a, E>(
|
||||
pub fn encode<E>(
|
||||
&self,
|
||||
get_cbind_data: impl FnOnce(&T) -> Result<&'a [u8], E>,
|
||||
get_cbind_data: impl FnOnce(&T) -> Result<String, E>,
|
||||
) -> Result<std::borrow::Cow<'static, str>, E> {
|
||||
use ChannelBinding::*;
|
||||
Ok(match self {
|
||||
@@ -51,11 +51,12 @@ impl<T: std::fmt::Display> ChannelBinding<T> {
|
||||
"eSws".into()
|
||||
}
|
||||
Required(mode) => {
|
||||
use std::io::Write;
|
||||
let mut cbind_input = vec![];
|
||||
write!(&mut cbind_input, "p={mode},,",).unwrap();
|
||||
cbind_input.extend_from_slice(get_cbind_data(mode)?);
|
||||
base64::encode(&cbind_input).into()
|
||||
let msg = format!(
|
||||
"p={mode},,{data}",
|
||||
mode = mode,
|
||||
data = get_cbind_data(mode)?
|
||||
);
|
||||
base64::encode(msg).into()
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -76,7 +77,7 @@ mod tests {
|
||||
];
|
||||
|
||||
for (cb, input) in cases {
|
||||
assert_eq!(cb.encode(|_| anyhow::Ok(b"bar"))?, input);
|
||||
assert_eq!(cb.encode(|_| anyhow::Ok("bar".to_owned()))?, input);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -22,12 +22,9 @@ pub use secret::ServerSecret;
|
||||
use hmac::{Hmac, Mac};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
|
||||
const SCRAM_SHA_256_PLUS: &str = "SCRAM-SHA-256-PLUS";
|
||||
|
||||
// TODO: add SCRAM-SHA-256-PLUS
|
||||
/// A list of supported SCRAM methods.
|
||||
pub const METHODS: &[&str] = &[SCRAM_SHA_256_PLUS, SCRAM_SHA_256];
|
||||
pub const METHODS_WITHOUT_PLUS: &[&str] = &[SCRAM_SHA_256];
|
||||
pub const METHODS: &[&str] = &["SCRAM-SHA-256"];
|
||||
|
||||
/// Decode base64 into array without any heap allocations
|
||||
fn base64_decode_array<const N: usize>(input: impl AsRef<[u8]>) -> Option<[u8; N]> {
|
||||
@@ -83,11 +80,7 @@ mod tests {
|
||||
const NONCE: [u8; 18] = [
|
||||
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
|
||||
];
|
||||
let mut exchange = Exchange::new(
|
||||
&secret,
|
||||
|| NONCE,
|
||||
crate::config::TlsServerEndPoint::Undefined,
|
||||
);
|
||||
let mut exchange = Exchange::new(&secret, || NONCE, None);
|
||||
|
||||
let client_first = "n,,n=user,r=rOprNGfwEbeRWgbNEkqO";
|
||||
let client_final = "c=biws,r=rOprNGfwEbeRWgbNEkqOAQIDBAUGBwgJCgsMDQ4PEBES,p=rw1r5Kph5ThxmaUBC2GAQ6MfXbPnNkFiTIvdb/Rear0=";
|
||||
|
||||
@@ -5,11 +5,9 @@ use super::messages::{
|
||||
};
|
||||
use super::secret::ServerSecret;
|
||||
use super::signature::SignatureBuilder;
|
||||
use crate::config;
|
||||
use crate::sasl::{self, ChannelBinding, Error as SaslError};
|
||||
|
||||
/// The only channel binding mode we currently support.
|
||||
#[derive(Debug)]
|
||||
struct TlsServerEndPoint;
|
||||
|
||||
impl std::fmt::Display for TlsServerEndPoint {
|
||||
@@ -45,20 +43,20 @@ pub struct Exchange<'a> {
|
||||
state: ExchangeState,
|
||||
secret: &'a ServerSecret,
|
||||
nonce: fn() -> [u8; SCRAM_RAW_NONCE_LEN],
|
||||
tls_server_end_point: config::TlsServerEndPoint,
|
||||
cert_digest: Option<&'a [u8]>,
|
||||
}
|
||||
|
||||
impl<'a> Exchange<'a> {
|
||||
pub fn new(
|
||||
secret: &'a ServerSecret,
|
||||
nonce: fn() -> [u8; SCRAM_RAW_NONCE_LEN],
|
||||
tls_server_end_point: config::TlsServerEndPoint,
|
||||
cert_digest: Option<&'a [u8]>,
|
||||
) -> Self {
|
||||
Self {
|
||||
state: ExchangeState::Initial,
|
||||
secret,
|
||||
nonce,
|
||||
tls_server_end_point,
|
||||
cert_digest,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -73,14 +71,6 @@ impl sasl::Mechanism for Exchange<'_> {
|
||||
let client_first_message = ClientFirstMessage::parse(input)
|
||||
.ok_or(SaslError::BadClientMessage("invalid client-first-message"))?;
|
||||
|
||||
// If the flag is set to "y" and the server supports channel
|
||||
// binding, the server MUST fail authentication
|
||||
if client_first_message.cbind_flag == ChannelBinding::NotSupportedServer
|
||||
&& self.tls_server_end_point.supported()
|
||||
{
|
||||
return Err(SaslError::ChannelBindingFailed("SCRAM-PLUS not used"));
|
||||
}
|
||||
|
||||
let server_first_message = client_first_message.build_server_first_message(
|
||||
&(self.nonce)(),
|
||||
&self.secret.salt_base64,
|
||||
@@ -104,11 +94,10 @@ impl sasl::Mechanism for Exchange<'_> {
|
||||
let client_final_message = ClientFinalMessage::parse(input)
|
||||
.ok_or(SaslError::BadClientMessage("invalid client-final-message"))?;
|
||||
|
||||
let channel_binding = cbind_flag.encode(|_| match &self.tls_server_end_point {
|
||||
config::TlsServerEndPoint::Sha256(x) => Ok(x),
|
||||
config::TlsServerEndPoint::Undefined => {
|
||||
Err(SaslError::ChannelBindingFailed("no cert digest provided"))
|
||||
}
|
||||
let channel_binding = cbind_flag.encode(|_| {
|
||||
self.cert_digest
|
||||
.map(base64::encode)
|
||||
.ok_or(SaslError::ChannelBindingFailed("no cert digest provided"))
|
||||
})?;
|
||||
|
||||
// This might've been caused by a MITM attack
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use crate::config::TlsServerEndPoint;
|
||||
use crate::error::UserFacingError;
|
||||
use anyhow::bail;
|
||||
use bytes::BytesMut;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
use pq_proto::framed::{ConnectionError, Framed};
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ProtocolError};
|
||||
use rustls::ServerConfig;
|
||||
@@ -18,7 +17,7 @@ use tokio_rustls::server::TlsStream;
|
||||
/// or [`AsyncWrite`] to prevent subtle errors (e.g. trying
|
||||
/// to pass random malformed bytes through the connection).
|
||||
pub struct PqStream<S> {
|
||||
pub(crate) framed: Framed<S>,
|
||||
framed: Framed<S>,
|
||||
}
|
||||
|
||||
impl<S> PqStream<S> {
|
||||
@@ -119,21 +118,19 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper for upgrading raw streams into secure streams.
|
||||
pub enum Stream<S> {
|
||||
/// We always begin with a raw stream,
|
||||
/// which may then be upgraded into a secure stream.
|
||||
Raw { raw: S },
|
||||
Tls {
|
||||
pin_project! {
|
||||
/// Wrapper for upgrading raw streams into secure streams.
|
||||
/// NOTE: it should be possible to decompose this object as necessary.
|
||||
#[project = StreamProj]
|
||||
pub enum Stream<S> {
|
||||
/// We always begin with a raw stream,
|
||||
/// which may then be upgraded into a secure stream.
|
||||
Raw { #[pin] raw: S },
|
||||
/// We box [`TlsStream`] since it can be quite large.
|
||||
tls: Box<TlsStream<S>>,
|
||||
/// Channel binding parameter
|
||||
tls_server_end_point: TlsServerEndPoint,
|
||||
},
|
||||
Tls { #[pin] tls: Box<TlsStream<S>> },
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Unpin> Unpin for Stream<S> {}
|
||||
|
||||
impl<S> Stream<S> {
|
||||
/// Construct a new instance from a raw stream.
|
||||
pub fn from_raw(raw: S) -> Self {
|
||||
@@ -144,17 +141,7 @@ impl<S> Stream<S> {
|
||||
pub fn sni_hostname(&self) -> Option<&str> {
|
||||
match self {
|
||||
Stream::Raw { .. } => None,
|
||||
Stream::Tls { tls, .. } => tls.get_ref().1.server_name(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tls_server_end_point(&self) -> TlsServerEndPoint {
|
||||
match self {
|
||||
Stream::Raw { .. } => TlsServerEndPoint::Undefined,
|
||||
Stream::Tls {
|
||||
tls_server_end_point,
|
||||
..
|
||||
} => *tls_server_end_point,
|
||||
Stream::Tls { tls } => tls.get_ref().1.server_name(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -171,9 +158,12 @@ pub enum StreamUpgradeError {
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
|
||||
/// If possible, upgrade raw stream into a secure TLS-based stream.
|
||||
pub async fn upgrade(self, cfg: Arc<ServerConfig>) -> Result<TlsStream<S>, StreamUpgradeError> {
|
||||
pub async fn upgrade(self, cfg: Arc<ServerConfig>) -> Result<Self, StreamUpgradeError> {
|
||||
match self {
|
||||
Stream::Raw { raw } => Ok(tokio_rustls::TlsAcceptor::from(cfg).accept(raw).await?),
|
||||
Stream::Raw { raw } => {
|
||||
let tls = Box::new(tokio_rustls::TlsAcceptor::from(cfg).accept(raw).await?);
|
||||
Ok(Stream::Tls { tls })
|
||||
}
|
||||
Stream::Tls { .. } => Err(StreamUpgradeError::AlreadyTls),
|
||||
}
|
||||
}
|
||||
@@ -181,46 +171,50 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for Stream<S> {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
self: Pin<&mut Self>,
|
||||
context: &mut task::Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> task::Poll<io::Result<()>> {
|
||||
match &mut *self {
|
||||
Self::Raw { raw } => Pin::new(raw).poll_read(context, buf),
|
||||
Self::Tls { tls, .. } => Pin::new(tls).poll_read(context, buf),
|
||||
use StreamProj::*;
|
||||
match self.project() {
|
||||
Raw { raw } => raw.poll_read(context, buf),
|
||||
Tls { tls } => tls.poll_read(context, buf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for Stream<S> {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
self: Pin<&mut Self>,
|
||||
context: &mut task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> task::Poll<io::Result<usize>> {
|
||||
match &mut *self {
|
||||
Self::Raw { raw } => Pin::new(raw).poll_write(context, buf),
|
||||
Self::Tls { tls, .. } => Pin::new(tls).poll_write(context, buf),
|
||||
use StreamProj::*;
|
||||
match self.project() {
|
||||
Raw { raw } => raw.poll_write(context, buf),
|
||||
Tls { tls } => tls.poll_write(context, buf),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
mut self: Pin<&mut Self>,
|
||||
self: Pin<&mut Self>,
|
||||
context: &mut task::Context<'_>,
|
||||
) -> task::Poll<io::Result<()>> {
|
||||
match &mut *self {
|
||||
Self::Raw { raw } => Pin::new(raw).poll_flush(context),
|
||||
Self::Tls { tls, .. } => Pin::new(tls).poll_flush(context),
|
||||
use StreamProj::*;
|
||||
match self.project() {
|
||||
Raw { raw } => raw.poll_flush(context),
|
||||
Tls { tls } => tls.poll_flush(context),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
mut self: Pin<&mut Self>,
|
||||
self: Pin<&mut Self>,
|
||||
context: &mut task::Context<'_>,
|
||||
) -> task::Poll<io::Result<()>> {
|
||||
match &mut *self {
|
||||
Self::Raw { raw } => Pin::new(raw).poll_shutdown(context),
|
||||
Self::Tls { tls, .. } => Pin::new(tls).poll_shutdown(context),
|
||||
use StreamProj::*;
|
||||
match self.project() {
|
||||
Raw { raw } => raw.poll_shutdown(context),
|
||||
Tls { tls } => tls.poll_shutdown(context),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ psutil = "^5.9.4"
|
||||
types-psutil = "^5.9.5.12"
|
||||
types-toml = "^0.10.8.6"
|
||||
pytest-httpserver = "^1.0.8"
|
||||
aiohttp = "3.9.0"
|
||||
aiohttp = "3.8.6"
|
||||
pytest-rerunfailures = "^11.1.2"
|
||||
types-pytest-lazy-fixture = "^0.6.3.3"
|
||||
pytest-split = "^0.8.1"
|
||||
|
||||
@@ -94,10 +94,11 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
!= index_part.get_disk_consistent_lsn()
|
||||
{
|
||||
result.errors.push(format!(
|
||||
"Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
|
||||
index_part.metadata.disk_consistent_lsn(),
|
||||
index_part.get_disk_consistent_lsn(),
|
||||
))
|
||||
"Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
|
||||
index_part.metadata.disk_consistent_lsn(),
|
||||
index_part.get_disk_consistent_lsn(),
|
||||
|
||||
))
|
||||
}
|
||||
|
||||
if index_part.layer_metadata.is_empty() {
|
||||
@@ -108,8 +109,8 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
for (layer, metadata) in index_part.layer_metadata {
|
||||
if metadata.file_size == 0 {
|
||||
result.errors.push(format!(
|
||||
"index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
|
||||
))
|
||||
"index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
|
||||
))
|
||||
}
|
||||
|
||||
let layer_map_key = (layer, metadata.generation);
|
||||
@@ -135,7 +136,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
// a new generation that didn't upload an index yet.
|
||||
//
|
||||
// Even so, a layer that is not referenced by the index could just
|
||||
// be something enqueued for deletion, so while this check is valid
|
||||
// be something enqueued for deletion, so while this check is valid
|
||||
// for indicating that a layer is garbage, it is not an indicator
|
||||
// of a problem.
|
||||
gen < &index_part_generation)
|
||||
|
||||
@@ -1572,7 +1572,7 @@ class NeonAttachmentService:
|
||||
self.running = False
|
||||
return self
|
||||
|
||||
def attach_hook_issue(self, tenant_id: TenantId, pageserver_id: int) -> int:
|
||||
def attach_hook(self, tenant_id: TenantId, pageserver_id: int) -> int:
|
||||
response = requests.post(
|
||||
f"{self.env.control_plane_api}/attach-hook",
|
||||
json={"tenant_id": str(tenant_id), "node_id": pageserver_id},
|
||||
@@ -1582,13 +1582,6 @@ class NeonAttachmentService:
|
||||
assert isinstance(gen, int)
|
||||
return gen
|
||||
|
||||
def attach_hook_drop(self, tenant_id: TenantId):
|
||||
response = requests.post(
|
||||
f"{self.env.control_plane_api}/attach-hook",
|
||||
json={"tenant_id": str(tenant_id), "node_id": None},
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
def __enter__(self) -> "NeonAttachmentService":
|
||||
return self
|
||||
|
||||
@@ -1788,20 +1781,13 @@ class NeonPageserver(PgProtocol):
|
||||
to call into the pageserver HTTP client.
|
||||
"""
|
||||
if self.env.attachment_service is not None:
|
||||
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
generation = self.env.attachment_service.attach_hook(tenant_id, self.id)
|
||||
else:
|
||||
generation = None
|
||||
|
||||
client = self.http_client()
|
||||
return client.tenant_attach(tenant_id, config, config_null, generation=generation)
|
||||
|
||||
def tenant_detach(self, tenant_id: TenantId):
|
||||
if self.env.attachment_service is not None:
|
||||
self.env.attachment_service.attach_hook_drop(tenant_id)
|
||||
|
||||
client = self.http_client()
|
||||
return client.tenant_detach(tenant_id)
|
||||
|
||||
|
||||
def append_pageserver_param_overrides(
|
||||
params_to_update: List[str],
|
||||
|
||||
@@ -4,7 +4,7 @@ import json
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
@@ -100,15 +100,6 @@ class LayerMapInfo:
|
||||
counts[hist_layer.kind] += 1
|
||||
return counts
|
||||
|
||||
def delta_layers(self) -> List[HistoricLayerInfo]:
|
||||
return [x for x in self.historic_layers if x.kind == "Delta"]
|
||||
|
||||
def image_layers(self) -> List[HistoricLayerInfo]:
|
||||
return [x for x in self.historic_layers if x.kind == "Image"]
|
||||
|
||||
def historic_by_name(self) -> Set[str]:
|
||||
return set(x.layer_file_name for x in self.historic_layers)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TenantConfig:
|
||||
@@ -425,10 +416,6 @@ class PageserverHttpClient(requests.Session):
|
||||
def timeline_gc(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int]
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Unlike most handlers, this will wait for the layers to be actually
|
||||
complete registering themselves to the deletion queue.
|
||||
"""
|
||||
self.is_testing_enabled_or_skip()
|
||||
|
||||
log.info(
|
||||
|
||||
@@ -14,7 +14,7 @@ Some handy pytest flags for local development:
|
||||
- `-s` shows test output
|
||||
- `-k` selects a test to run
|
||||
- `--timeout=0` disables our default timeout of 300s (see `setup.cfg`)
|
||||
- `--preserve-database-files` to skip cleanup
|
||||
- `--cleanup-test-ouput` cleans up after each test
|
||||
|
||||
# What performance tests do we have and how we run them
|
||||
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import os
|
||||
import shutil
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -15,33 +14,62 @@ from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
# use neon_env_builder_local fixture to override the default neon_env_builder fixture
|
||||
# and use a test-specific pg_install instead of shared one
|
||||
# Check that the extension is not already in the share_dir_path_ext
|
||||
# if it is, skip the test
|
||||
#
|
||||
# After the test is done, cleanup the control file and the extension directory
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_env_builder_local(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
) -> NeonEnvBuilder:
|
||||
test_local_pginstall = test_output_dir / "pg_install"
|
||||
log.info(f"copy {pg_distrib_dir} to {test_local_pginstall}")
|
||||
shutil.copytree(
|
||||
pg_distrib_dir / pg_version.v_prefixed, test_local_pginstall / pg_version.v_prefixed
|
||||
)
|
||||
def ext_file_cleanup(pg_bin):
|
||||
out = pg_bin.run_capture("pg_config --sharedir".split())
|
||||
share_dir_path = Path(f"{out}.stdout").read_text().strip()
|
||||
log.info(f"share_dir_path: {share_dir_path}")
|
||||
share_dir_path_ext = os.path.join(share_dir_path, "extension")
|
||||
|
||||
neon_env_builder.pg_distrib_dir = test_local_pginstall
|
||||
log.info(f"local neon_env_builder.pg_distrib_dir: {neon_env_builder.pg_distrib_dir}")
|
||||
log.info(f"share_dir_path_ext: {share_dir_path_ext}")
|
||||
|
||||
return neon_env_builder
|
||||
# if file is already in the share_dir_path_ext, skip the test
|
||||
if os.path.isfile(os.path.join(share_dir_path_ext, "anon.control")):
|
||||
log.info("anon.control is already in the share_dir_path_ext, skipping the test")
|
||||
yield False
|
||||
return
|
||||
else:
|
||||
yield True
|
||||
|
||||
# cleanup the control file
|
||||
if os.path.isfile(os.path.join(share_dir_path_ext, "anon.control")):
|
||||
os.unlink(os.path.join(share_dir_path_ext, "anon.control"))
|
||||
log.info("anon.control was removed from the share_dir_path_ext")
|
||||
|
||||
# remove the extension directory recursively
|
||||
if os.path.isdir(os.path.join(share_dir_path_ext, "anon")):
|
||||
directories_to_clean: List[Path] = []
|
||||
for f in Path(os.path.join(share_dir_path_ext, "anon")).iterdir():
|
||||
if f.is_file():
|
||||
log.info(f"Removing file {f}")
|
||||
f.unlink()
|
||||
elif f.is_dir():
|
||||
directories_to_clean.append(f)
|
||||
|
||||
for directory_to_clean in reversed(directories_to_clean):
|
||||
if not os.listdir(directory_to_clean):
|
||||
log.info(f"Removing empty directory {directory_to_clean}")
|
||||
directory_to_clean.rmdir()
|
||||
|
||||
os.rmdir(os.path.join(share_dir_path_ext, "anon"))
|
||||
log.info("anon directory was removed from the share_dir_path_ext")
|
||||
|
||||
|
||||
def test_remote_extensions(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder_local: NeonEnvBuilder,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
pg_version,
|
||||
ext_file_cleanup,
|
||||
):
|
||||
if ext_file_cleanup is False:
|
||||
log.info("test_remote_extensions skipped")
|
||||
return
|
||||
|
||||
if pg_version == PgVersion.V16:
|
||||
pytest.skip("TODO: PG16 extension building")
|
||||
|
||||
@@ -51,8 +79,7 @@ def test_remote_extensions(
|
||||
(host, port) = httpserver_listen_address
|
||||
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"
|
||||
|
||||
build_tag = os.environ.get("BUILD_TAG", "latest")
|
||||
archive_path = f"{build_tag}/v{pg_version}/extensions/anon.tar.zst"
|
||||
archive_path = f"latest/v{pg_version}/extensions/anon.tar.zst"
|
||||
|
||||
def endpoint_handler_build_tag(request: Request) -> Response:
|
||||
log.info(f"request: {request}")
|
||||
@@ -61,7 +88,6 @@ def test_remote_extensions(
|
||||
file_path = f"test_runner/regress/data/extension_test/5670669815/v{pg_version}/extensions/anon.tar.zst"
|
||||
file_size = os.path.getsize(file_path)
|
||||
fh = open(file_path, "rb")
|
||||
|
||||
return Response(
|
||||
fh,
|
||||
mimetype="application/octet-stream",
|
||||
@@ -78,10 +104,12 @@ def test_remote_extensions(
|
||||
|
||||
# Start a compute node with remote_extension spec
|
||||
# and check that it can download the extensions and use them to CREATE EXTENSION.
|
||||
env = neon_env_builder_local.init_start()
|
||||
env.neon_cli.create_branch("test_remote_extensions")
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
|
||||
endpoint = env.endpoints.create(
|
||||
"test_remote_extensions",
|
||||
tenant_id=tenant_id,
|
||||
config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
|
||||
|
||||
@@ -282,7 +282,7 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Now advance the generation in the control plane: subsequent validations
|
||||
# from the running pageserver will fail. No more deletions should happen.
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
|
||||
generate_uploads_and_deletions(env, init=False)
|
||||
|
||||
assert_deletion_queue(ps_http, lambda n: n > 0)
|
||||
@@ -397,7 +397,7 @@ def test_deletion_queue_recovery(
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
some_other_pageserver = 101010
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
|
||||
|
||||
env.pageserver.start()
|
||||
|
||||
|
||||
@@ -763,7 +763,9 @@ def test_compaction_waits_for_upload(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
This test forces a race between upload and compaction.
|
||||
Compaction waits for outstanding uploads to complete, so that it avoids deleting layers
|
||||
files that have not yet been uploaded. This test forces a race between upload and
|
||||
compaction.
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
@@ -782,16 +784,6 @@ def test_compaction_waits_for_upload(
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
layers_at_creation = client.layer_map_info(tenant_id, timeline_id)
|
||||
deltas_at_creation = len(layers_at_creation.delta_layers())
|
||||
assert (
|
||||
deltas_at_creation == 1
|
||||
), "are you fixing #5863? make sure we end up with 2 deltas at the end of endpoint lifecycle"
|
||||
|
||||
# Make new layer uploads get stuck.
|
||||
# Note that timeline creation waits for the initial layers to reach remote storage.
|
||||
# So at this point, the `layers_at_creation` are in remote storage.
|
||||
client.configure_failpoints(("before-upload-layer-pausable", "pause"))
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
# Build two tables with some data inside
|
||||
@@ -799,71 +791,85 @@ def test_compaction_waits_for_upload(
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
deltas_at_first = len(client.layer_map_info(tenant_id, timeline_id).delta_layers())
|
||||
assert (
|
||||
deltas_at_first == 2
|
||||
), "are you fixing #5863? just add one more checkpoint after 'CREATE TABLE bar ...' statement."
|
||||
|
||||
endpoint.safe_psql("CREATE TABLE bar AS SELECT x FROM generate_series(1, 10000) g(x)")
|
||||
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
layers_before_last_checkpoint = client.layer_map_info(tenant_id, timeline_id).historic_by_name()
|
||||
upload_stuck_layers = layers_before_last_checkpoint - layers_at_creation.historic_by_name()
|
||||
# Now make the flushing hang and update one small piece of data
|
||||
client.configure_failpoints(("before-upload-layer-pausable", "pause"))
|
||||
|
||||
assert len(upload_stuck_layers) > 0
|
||||
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
|
||||
|
||||
for name in upload_stuck_layers:
|
||||
path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name
|
||||
assert path.exists(), "while uploads are stuck the layers should be present on disk"
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
# now this will do the L0 => L1 compaction and want to remove
|
||||
# upload_stuck_layers and the original initdb L0
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
checkpoint_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
|
||||
compact_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
|
||||
compact_barrier = threading.Barrier(2)
|
||||
|
||||
# as uploads are paused, the the upload_stuck_layers should still be with us
|
||||
for name in upload_stuck_layers:
|
||||
path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name
|
||||
assert path.exists(), "uploads are stuck still over compaction"
|
||||
def checkpoint_in_background():
|
||||
try:
|
||||
log.info("Checkpoint starting")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
log.info("Checkpoint complete")
|
||||
checkpoint_result.put(None)
|
||||
except PageserverApiException as e:
|
||||
log.info("Checkpoint errored: {e}")
|
||||
checkpoint_result.put(e)
|
||||
|
||||
compacted_layers = client.layer_map_info(tenant_id, timeline_id).historic_by_name()
|
||||
overlap = compacted_layers.intersection(upload_stuck_layers)
|
||||
assert len(overlap) == 0, "none of the L0's should remain after L0 => L1 compaction"
|
||||
assert (
|
||||
len(compacted_layers) == 1
|
||||
), "there should be one L1 after L0 => L1 compaction (without #5863 being fixed)"
|
||||
def compact_in_background():
|
||||
compact_barrier.wait()
|
||||
try:
|
||||
log.info("Compaction starting")
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
log.info("Compaction complete")
|
||||
compact_result.put(None)
|
||||
except PageserverApiException as e:
|
||||
log.info("Compaction errored: {e}")
|
||||
compact_result.put(e)
|
||||
|
||||
def layer_deletes_completed():
|
||||
m = client.get_metric_value("pageserver_layer_gcs_count_total", {"state": "completed"})
|
||||
if m is None:
|
||||
return 0
|
||||
return int(m)
|
||||
checkpoint_thread = threading.Thread(target=checkpoint_in_background)
|
||||
checkpoint_thread.start()
|
||||
|
||||
# if initdb created an initial delta layer, it might already be gc'd
|
||||
# because it was uploaded before the failpoint was enabled. however, the
|
||||
# deletion is not guaranteed to be complete.
|
||||
assert layer_deletes_completed() <= 1
|
||||
compact_thread = threading.Thread(target=compact_in_background)
|
||||
compact_thread.start()
|
||||
|
||||
client.configure_failpoints(("before-upload-layer-pausable", "off"))
|
||||
try:
|
||||
# Start the checkpoint, see that it blocks
|
||||
log.info("Waiting to see checkpoint hang...")
|
||||
time.sleep(5)
|
||||
assert checkpoint_result.empty()
|
||||
|
||||
# Start the compaction, see that it finds work to do but blocks
|
||||
compact_barrier.wait()
|
||||
log.info("Waiting to see compaction hang...")
|
||||
time.sleep(5)
|
||||
assert compact_result.empty()
|
||||
|
||||
# This is logged once compaction is started, but before we wait for operations to complete
|
||||
assert env.pageserver.log_contains("compact_level0_phase1 stats available.")
|
||||
|
||||
# Once we unblock uploads the compaction should complete successfully
|
||||
log.info("Disabling failpoint")
|
||||
client.configure_failpoints(("before-upload-layer-pausable", "off"))
|
||||
log.info("Awaiting compaction result")
|
||||
assert compact_result.get(timeout=10) is None
|
||||
log.info("Awaiting checkpoint result")
|
||||
assert checkpoint_result.get(timeout=10) is None
|
||||
|
||||
except Exception:
|
||||
# Log the actual failure's backtrace here, before we proceed to join threads
|
||||
log.exception("Failure, cleaning up...")
|
||||
raise
|
||||
finally:
|
||||
compact_barrier.abort()
|
||||
|
||||
checkpoint_thread.join()
|
||||
compact_thread.join()
|
||||
|
||||
# Ensure that this actually terminates
|
||||
wait_upload_queue_empty(client, tenant_id, timeline_id)
|
||||
|
||||
def until_layer_deletes_completed():
|
||||
deletes = layer_deletes_completed()
|
||||
log.info(f"layer_deletes: {deletes}")
|
||||
# ensure that initdb delta layer AND the previously stuck are now deleted
|
||||
assert deletes >= len(upload_stuck_layers) + 1
|
||||
|
||||
wait_until(10, 1, until_layer_deletes_completed)
|
||||
|
||||
for name in upload_stuck_layers:
|
||||
path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name
|
||||
assert (
|
||||
not path.exists()
|
||||
), "l0 should now be removed because of L0 => L1 compaction and completed uploads"
|
||||
|
||||
# We should not have hit the error handling path in uploads where a uploaded file is gone
|
||||
# We should not have hit the error handling path in uploads where the remote file is gone
|
||||
assert not env.pageserver.log_contains(
|
||||
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user