Compare commits

..

15 Commits

Author SHA1 Message Date
Cihan Demirci
dcd9a80c51 login 2024-07-25 23:00:10 +03:00
Cihan Demirci
74e7e319c3 try this 2024-07-25 22:27:49 +03:00
Cihan Demirci
20b12ff169 help 2024-07-25 22:23:28 +03:00
Cihan Demirci
3c3694bcce obey 2024-07-25 22:18:21 +03:00
Cihan Demirci
6e92eb83b0 login 2024-07-25 22:16:38 +03:00
Cihan Demirci
21c577f619 easier 2024-07-25 22:00:55 +03:00
Cihan Demirci
020e45a004 push 2024-07-25 21:55:26 +03:00
Cihan Demirci
25ee14d304 change 2024-07-25 21:51:51 +03:00
Cihan Demirci
2a450a4947 perms 2024-07-25 18:39:13 +03:00
Cihan Demirci
3c27f5559d apt install 2024-07-25 18:35:06 +03:00
Cihan Demirci
93243a56f2 upd 2024-07-25 16:57:41 +03:00
Cihan Demirci
dd90f0c340 moar install 2024-07-25 16:56:05 +03:00
Cihan Demirci
b5c9e56484 install 2024-07-25 16:51:01 +03:00
Cihan Demirci
d044ca51fa shorter feedback cycle 2024-07-25 16:33:15 +03:00
Cihan Demirci
ad60f73122 dnm: test azure/login 2024-07-25 16:25:17 +03:00
38 changed files with 164 additions and 356 deletions

View File

@@ -72,12 +72,6 @@ jobs:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- uses: docker/login-action@v3
with:
registry: cache.neon.build
username: ${{ secrets.NEON_CI_DOCKERCACHE_USERNAME }}
password: ${{ secrets.NEON_CI_DOCKERCACHE_PASSWORD }}
- uses: docker/build-push-action@v6
with:
context: .
@@ -85,8 +79,8 @@ jobs:
push: true
pull: true
file: Dockerfile.build-tools
cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0},mode=max', matrix.arch) || '' }}
cache-from: type=registry,ref=neondatabase/build-tools:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=neondatabase/build-tools:cache-{0},mode=max', matrix.arch) || '' }}
tags: neondatabase/build-tools:${{ inputs.image-tag }}-${{ matrix.arch }}
- name: Remove custom docker config directory

View File

@@ -48,12 +48,30 @@ jobs:
tag:
needs: [ check-permissions ]
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
runs-on: ubuntu-22.04
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
permissions:
id-token: write
contents: read
steps:
# - name: Install az cli
# run: |
# curl -sL https://aka.ms/InstallAzureCLIDeb --output-dir /tmp -OJ
# bash /tmp/InstallAzureCLIDeb
#
- uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # v2.1.1
with:
client-id: ${{ secrets.AZURE_DEV_RUNNER_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
- name: push
run: |
az acr login --name neoneastus2
docker buildx imagetools create -t neoneastus2.azurecr.io/neondatabase/neon:5718 neondatabase/neon:5718
- name: Checkout
uses: actions/checkout@v4
with:
@@ -499,12 +517,6 @@ jobs:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- uses: docker/login-action@v3
with:
registry: cache.neon.build
username: ${{ secrets.NEON_CI_DOCKERCACHE_USERNAME }}
password: ${{ secrets.NEON_CI_DOCKERCACHE_PASSWORD }}
- uses: docker/build-push-action@v6
with:
context: .
@@ -516,8 +528,9 @@ jobs:
push: true
pull: true
file: Dockerfile
cache-from: type=registry,ref=cache.neon.build/neon:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon:cache-{0},mode=max', matrix.arch) || '' }}
cache-from: type=registry,ref=neondatabase/neon:cache-${{ matrix.arch }}
# 23.07.2024 temporarily disable cache saving in the registry as it is very slow
# cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=neondatabase/neon:cache-{0},mode=max', matrix.arch) || '' }}
tags: |
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
@@ -596,12 +609,6 @@ jobs:
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
- uses: docker/login-action@v3
with:
registry: cache.neon.build
username: ${{ secrets.NEON_CI_DOCKERCACHE_USERNAME }}
password: ${{ secrets.NEON_CI_DOCKERCACHE_PASSWORD }}
- name: Build compute-node image
uses: docker/build-push-action@v6
with:
@@ -615,8 +622,9 @@ jobs:
push: true
pull: true
file: Dockerfile.compute-node
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }}
cache-from: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }}
# 23.07.2024 temporarily disable cache saving in the registry as it is very slow
# cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=neondatabase/compute-node-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }}
tags: |
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
@@ -635,8 +643,9 @@ jobs:
pull: true
file: Dockerfile.compute-node
target: neon-pg-ext-test
cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version }}:cache-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }}
cache-from: type=registry,ref=neondatabase/neon-test-extensions-${{ matrix.version }}:cache-${{ matrix.arch }}
# 23.07.2024 temporarily disable cache saving in the registry as it is very slow
# cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=neondatabase/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }}
tags: |
neondatabase/neon-test-extensions-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }}
@@ -843,6 +852,12 @@ jobs:
VERSIONS: v14 v15 v16
steps:
- uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # v2.1.1
with:
client-id: ${{ secrets.AZURE_RUNNER_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}

View File

@@ -192,7 +192,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.80.0
ENV RUSTC_VERSION=1.79.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \

View File

@@ -657,7 +657,7 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
rm rustup-init && \
cargo install --locked --version 0.11.3 cargo-pgrx && \
cargo install --locked --version 0.10.2 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
@@ -672,15 +672,10 @@ USER root
FROM rust-extensions-build AS pg-jsonschema-pg-build
ARG PG_VERSION
RUN wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.3.1.tar.gz -O pg_jsonschema.tar.gz && \
echo "61df3db1ed83cf24f6aa39c826f8818bfa4f0bd33b587fd6b2b1747985642297 pg_jsonschema.tar.gz" | sha256sum --check && \
RUN wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.2.0.tar.gz -O pg_jsonschema.tar.gz && \
echo "9118fc508a6e231e7a39acaa6f066fcd79af17a5db757b47d2eefbe14f7794f0 pg_jsonschema.tar.gz" | sha256sum --check && \
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
# see commit 252b3685a27a0f4c31a0f91e983c6314838e89e8
# `unsafe-postgres` feature allows to build pgx extensions
# against postgres forks that decided to change their ABI name (like us).
# With that we can build extensions without forking them and using stock
# pgx. As this feature is new few manual version bumps were required.
sed -i 's/pgrx = "0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/pgrx = "0.10.2"/pgrx = { version = "0.10.2", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
@@ -694,10 +689,10 @@ RUN wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.3.1.tar.
FROM rust-extensions-build AS pg-graphql-pg-build
ARG PG_VERSION
RUN wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.5.7.tar.gz -O pg_graphql.tar.gz && \
echo "2b3e567a5b31019cb97ae0e33263c1bcc28580be5a444ac4c8ece5c4be2aea41 pg_graphql.tar.gz" | sha256sum --check && \
RUN wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.4.0.tar.gz -O pg_graphql.tar.gz && \
echo "bd8dc7230282b3efa9ae5baf053a54151ed0e66881c7c53750e2d0c765776edc pg_graphql.tar.gz" | sha256sum --check && \
mkdir pg_graphql-src && cd pg_graphql-src && tar xzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/pgrx = "=0.10.2"/pgrx = { version = "0.10.2", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
# it's needed to enable extension because it uses untrusted C language
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
@@ -717,9 +712,6 @@ ARG PG_VERSION
RUN wget https://github.com/kelvich/pg_tiktoken/archive/26806147b17b60763039c6a6878884c41a262318.tar.gz -O pg_tiktoken.tar.gz && \
echo "e64e55aaa38c259512d3e27c572da22c4637418cf124caba904cd50944e5004e pg_tiktoken.tar.gz" | sha256sum --check && \
mkdir pg_tiktoken-src && cd pg_tiktoken-src && tar xzf ../pg_tiktoken.tar.gz --strip-components=1 -C . && \
# TODO update pgrx version in the pg_tiktoken repo and remove this line
sed -i 's/pgrx = { version = "=0.10.2",/pgrx = { version = "0.11.3",/g' Cargo.toml && \
sed -i 's/pgrx-tests = "=0.10.2"/pgrx-tests = "0.11.3"/g' Cargo.toml && \
cargo pgrx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_tiktoken.control
@@ -733,10 +725,14 @@ RUN wget https://github.com/kelvich/pg_tiktoken/archive/26806147b17b60763039c6a6
FROM rust-extensions-build AS pg-pgx-ulid-build
ARG PG_VERSION
RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.5.tar.gz -O pgx_ulid.tar.gz && \
echo "9d1659a2da65af0133d5451c454de31b37364e3502087dadf579f790bc8bef17 pgx_ulid.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.3.tar.gz -O pgx_ulid.tar.gz && \
echo "ee5db82945d2d9f2d15597a80cf32de9dca67b897f605beb830561705f12683c pgx_ulid.tar.gz" | sha256sum --check && \
mkdir pgx_ulid-src && cd pgx_ulid-src && tar xzf ../pgx_ulid.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "^0.11.2"/pgrx = { version = "=0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
echo "******************* Apply a patch for Postgres 16 support; delete in the next release ******************" && \
wget https://github.com/pksunkara/pgx_ulid/commit/f84954cf63fc8c80d964ac970d9eceed3c791196.patch && \
patch -p1 < f84954cf63fc8c80d964ac970d9eceed3c791196.patch && \
echo "********************************************************************************************************" && \
sed -i 's/pgrx = "=0.10.2"/pgrx = { version = "=0.10.2", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control

View File

@@ -21,9 +21,9 @@ implementation where we keep more data than we would need to, do not
change the synthetic size or incur any costs to the user.
The synthetic size is calculated for the whole project. It is not
straightforward to attribute size to individual branches. See [What is
the size of an individual branch?](#what-is-the-size-of-an-individual-branch)
for a discussion of those difficulties.
straightforward to attribute size to individual branches. See "What is
the size of an individual branch?" for discussion on those
difficulties.
The synthetic size is designed to:
@@ -40,9 +40,8 @@ The synthetic size is designed to:
- logical size is the size of a branch *at a given point in
time*. It's the total size of all tables in all databases, as you
see with "\l+" in psql for example, plus the Postgres SLRUs and some
small amount of metadata. Note that currently, Neon does not include
the SLRUs and metadata in the logical size. Refer to the comment in
[`get_current_logical_size_non_incremental()`](/pageserver/src/pgdatadir_mapping.rs#L813-L814).
small amount of metadata. NOTE that currently, Neon does not include
the SLRUs and metadata in the logical size. See comment to `get_current_logical_size_non_incremental()`.
- a "point in time" is defined as an LSN value. You can convert a
timestamp to an LSN, but the storage internally works with LSNs.

View File

@@ -29,7 +29,7 @@ use anyhow::{bail, Result};
use bytes::{Bytes, BytesMut};
/// Equivalent to sizeof(ControlFileData) in C
const SIZEOF_CONTROLDATA: usize = size_of::<ControlFileData>();
const SIZEOF_CONTROLDATA: usize = std::mem::size_of::<ControlFileData>();
impl ControlFileData {
/// Compute the offset of the `crc` field within the `ControlFileData` struct.

View File

@@ -31,7 +31,7 @@ pub const SMGR_TRUNCATE_FSM: u32 = 0x0004;
//
// Assumes 8 byte alignment
const SIZEOF_PAGE_HEADER_DATA: usize = size_of::<PageHeaderData>();
const SIZEOF_PAGE_HEADER_DATA: usize = std::mem::size_of::<PageHeaderData>();
pub const MAXALIGN_SIZE_OF_PAGE_HEADER_DATA: usize = (SIZEOF_PAGE_HEADER_DATA + 7) & !7;
//
@@ -191,7 +191,7 @@ pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;
pub const XLOG_TBLSPC_CREATE: u8 = 0x00;
pub const XLOG_TBLSPC_DROP: u8 = 0x10;
pub const SIZEOF_XLOGRECORD: u32 = size_of::<XLogRecord>() as u32;
pub const SIZEOF_XLOGRECORD: u32 = std::mem::size_of::<XLogRecord>() as u32;
//
// from xlogrecord.h

View File

@@ -42,9 +42,9 @@ pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = size_of::<XLogPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = size_of::<XLogLongPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = size_of::<XLogRecord>();
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
#[allow(clippy::identity_op)]
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
@@ -311,7 +311,7 @@ impl XLogLongPageHeaderData {
}
}
pub const SIZEOF_CHECKPOINT: usize = size_of::<CheckPoint>();
pub const SIZEOF_CHECKPOINT: usize = std::mem::size_of::<CheckPoint>();
impl CheckPoint {
pub fn encode(&self) -> Result<Bytes, SerializeError> {

View File

@@ -178,7 +178,7 @@ pub fn test_find_end_of_wal_last_crossing_segment() {
/// currently 1024.
#[test]
pub fn test_update_next_xid() {
let checkpoint_buf = [0u8; size_of::<CheckPoint>()];
let checkpoint_buf = [0u8; std::mem::size_of::<CheckPoint>()];
let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
checkpoint.nextXid = FullTransactionId { value: 10 };
@@ -204,7 +204,7 @@ pub fn test_update_next_xid() {
#[test]
pub fn test_update_next_multixid() {
let checkpoint_buf = [0u8; size_of::<CheckPoint>()];
let checkpoint_buf = [0u8; std::mem::size_of::<CheckPoint>()];
let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
// simple case

View File

@@ -33,7 +33,6 @@ use tracing::debug;
use utils::backoff;
use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
use crate::ListingObject;
use crate::{
config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError, Listing,
ListingMode, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
@@ -353,11 +352,7 @@ impl RemoteStorage for AzureBlobStorage {
let blob_iter = entry
.blobs
.blobs()
.map(|k| ListingObject{
key: self.name_to_relative_path(&k.name),
last_modified: k.properties.last_modified.into()
}
);
.map(|k| self.name_to_relative_path(&k.name));
for key in blob_iter {
res.keys.push(key);

View File

@@ -149,16 +149,10 @@ pub enum ListingMode {
NoDelimiter,
}
#[derive(PartialEq, Eq, Debug)]
pub struct ListingObject {
pub key: RemotePath,
pub last_modified: SystemTime,
}
#[derive(Default)]
pub struct Listing {
pub prefixes: Vec<RemotePath>,
pub keys: Vec<ListingObject>,
pub keys: Vec<RemotePath>,
}
/// Storage (potentially remote) API to manage its state.
@@ -207,7 +201,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
let mut combined = stream.next().await.expect("At least one item required")?;
while let Some(list) = stream.next().await {
let list = list?;
combined.keys.extend(list.keys.into_iter());
combined.keys.extend_from_slice(&list.keys);
combined.prefixes.extend_from_slice(&list.prefixes);
}
Ok(combined)

View File

@@ -23,8 +23,8 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
use utils::crashsafe::path_with_suffix_extension;
use crate::{
Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath, TimeTravelError,
TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel,
REMOTE_STORAGE_PREFIX_SEPARATOR,
};
use super::{RemoteStorage, StorageMetadata};
@@ -357,28 +357,19 @@ impl RemoteStorage for LocalFs {
.list_recursive(prefix)
.await
.map_err(DownloadError::Other)?;
let objects = keys
let keys = keys
.into_iter()
.filter_map(|k| {
.filter(|k| {
let path = k.with_base(&self.storage_root);
if path.is_dir() {
None
} else {
Some(ListingObject {
key: k.clone(),
// LocalFs is just for testing, so just specify a dummy time
last_modified: SystemTime::now(),
})
}
!path.is_dir()
})
.collect();
if let ListingMode::NoDelimiter = mode {
result.keys = objects;
result.keys = keys;
} else {
let mut prefixes = HashSet::new();
for object in objects {
let key = object.key;
for key in keys {
// If the part after the prefix includes a "/", take only the first part and put it in `prefixes`.
let relative_key = if let Some(prefix) = prefix {
let mut prefix = prefix.clone();
@@ -407,11 +398,9 @@ impl RemoteStorage for LocalFs {
.to_owned();
prefixes.insert(first_part);
} else {
result.keys.push(ListingObject {
key: RemotePath::from_string(&relative_key).unwrap(),
// LocalFs is just for testing
last_modified: SystemTime::now(),
});
result
.keys
.push(RemotePath::from_string(&relative_key).unwrap());
}
}
result.prefixes = prefixes
@@ -961,11 +950,7 @@ mod fs_tests {
.await?;
assert!(listing.prefixes.is_empty());
assert_eq!(
listing
.keys
.into_iter()
.map(|o| o.key)
.collect::<HashSet<_>>(),
listing.keys.into_iter().collect::<HashSet<_>>(),
HashSet::from([uncle.clone(), child.clone(), child_sibling.clone()])
);
@@ -990,7 +975,7 @@ mod fs_tests {
)
.await?;
assert_eq!(
listing.keys.into_iter().map(|o| o.key).collect::<Vec<_>>(),
listing.keys,
[RemotePath::from_string("uncle").unwrap()].to_vec()
);
assert_eq!(
@@ -1007,7 +992,7 @@ mod fs_tests {
&cancel,
)
.await?;
assert_eq!(listing.keys, vec![]);
assert_eq!(listing.keys, [].to_vec());
assert_eq!(
listing.prefixes,
[RemotePath::from_string("grandparent").unwrap()].to_vec()
@@ -1022,7 +1007,7 @@ mod fs_tests {
&cancel,
)
.await?;
assert_eq!(listing.keys, vec![]);
assert_eq!(listing.keys, [].to_vec());
assert_eq!(
listing.prefixes,
[RemotePath::from_string("grandparent").unwrap()].to_vec()
@@ -1055,7 +1040,7 @@ mod fs_tests {
&cancel,
)
.await?;
assert_eq!(listing.keys, vec![]);
assert_eq!(listing.keys, [].to_vec());
let mut found_prefixes = listing.prefixes.clone();
found_prefixes.sort();

View File

@@ -44,9 +44,8 @@ use crate::{
error::Cancelled,
metrics::{start_counting_cancelled_wait, start_measuring_requests},
support::PermitCarrying,
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath,
RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
REMOTE_STORAGE_PREFIX_SEPARATOR,
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
use crate::metrics::AttemptOutcome;
@@ -549,26 +548,9 @@ impl RemoteStorage for S3Bucket {
let mut result = Listing::default();
for object in keys {
let key = object.key().expect("response does not contain a key");
let key = self.s3_object_to_relative_path(key);
let last_modified = match object.last_modified.map(SystemTime::try_from) {
Some(Ok(t)) => t,
Some(Err(_)) => {
tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
object.last_modified, key
);
SystemTime::now()
},
None => {
SystemTime::now()
}
};
result.keys.push(ListingObject{
key,
last_modified
});
let object_path = object.key().expect("response does not contain a key");
let remote_path = self.s3_object_to_relative_path(object_path);
result.keys.push(remote_path);
if let Some(mut mk) = max_keys {
assert!(mk > 0);
mk -= 1;

View File

@@ -156,7 +156,6 @@ async fn list_no_delimiter_works(
.context("client list root files failure")?
.keys
.into_iter()
.map(|o| o.key)
.collect::<HashSet<_>>();
assert_eq!(
root_files,
@@ -183,7 +182,6 @@ async fn list_no_delimiter_works(
.context("client list nested files failure")?
.keys
.into_iter()
.map(|o| o.key)
.collect::<HashSet<_>>();
let trim_remote_blobs: HashSet<_> = ctx
.remote_blobs

View File

@@ -81,7 +81,6 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
.context("list root files failure")?
.keys
.into_iter()
.map(|o| o.key)
.collect::<HashSet<_>>(),
)
}

View File

@@ -7309,9 +7309,7 @@ mod tests {
(
key,
Lsn(0x80),
Value::Image(Bytes::copy_from_slice(
b"0x10;0x20;0x30;0x40;0x50;0x60;0x70;0x80",
)),
Value::WalRecord(NeonWalRecord::wal_append(";0x80")),
),
(
key,
@@ -7373,9 +7371,7 @@ mod tests {
),
(
Lsn(0x80),
Value::Image(Bytes::copy_from_slice(
b"0x10;0x20;0x30;0x40;0x50;0x60;0x70;0x80",
)),
Value::WalRecord(NeonWalRecord::wal_append(";0x80")),
),
(
Lsn(0x90),
@@ -7384,118 +7380,7 @@ mod tests {
]),
};
assert_eq!(res, expected_res);
// We expect GC-compaction to run with the original GC. This would create a situation that
// the original GC algorithm removes some delta layers b/c there are full image coverage,
// therefore causing some keys to have an incomplete history below the lowest retain LSN.
// For example, we have
// ```plain
// init delta @ 0x10, image @ 0x20, delta @ 0x30 (gc_horizon), image @ 0x40.
// ```
// Now the GC horizon moves up, and we have
// ```plain
// init delta @ 0x10, image @ 0x20, delta @ 0x30, image @ 0x40 (gc_horizon)
// ```
// The original GC algorithm kicks in, and removes delta @ 0x10, image @ 0x20.
// We will end up with
// ```plain
// delta @ 0x30, image @ 0x40 (gc_horizon)
// ```
// Now we run the GC-compaction, and this key does not have a full history.
// We should be able to handle this partial history and drop everything before the
// gc_horizon image.
let history = vec![
(
key,
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(";0x20")),
),
(
key,
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append(";0x30")),
),
(
key,
Lsn(0x40),
Value::Image(Bytes::copy_from_slice(b"0x10;0x20;0x30;0x40")),
),
(
key,
Lsn(0x50),
Value::WalRecord(NeonWalRecord::wal_append(";0x50")),
),
(
key,
Lsn(0x60),
Value::WalRecord(NeonWalRecord::wal_append(";0x60")),
),
(
key,
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
),
(
key,
Lsn(0x80),
Value::Image(Bytes::copy_from_slice(
b"0x10;0x20;0x30;0x40;0x50;0x60;0x70;0x80",
)),
),
(
key,
Lsn(0x90),
Value::WalRecord(NeonWalRecord::wal_append(";0x90")),
),
];
let res = tline
.generate_key_retention(key, &history, Lsn(0x60), &[Lsn(0x40), Lsn(0x50)], 3)
.await
.unwrap();
let expected_res = KeyHistoryRetention {
below_horizon: vec![
(
Lsn(0x40),
KeyLogAtLsn(vec![(
Lsn(0x40),
Value::Image(Bytes::copy_from_slice(b"0x10;0x20;0x30;0x40")),
)]),
),
(
Lsn(0x50),
KeyLogAtLsn(vec![(
Lsn(0x50),
Value::WalRecord(NeonWalRecord::wal_append(";0x50")),
)]),
),
(
Lsn(0x60),
KeyLogAtLsn(vec![(
Lsn(0x60),
Value::WalRecord(NeonWalRecord::wal_append(";0x60")),
)]),
),
],
above_horizon: KeyLogAtLsn(vec![
(
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
),
(
Lsn(0x80),
Value::Image(Bytes::copy_from_slice(
b"0x10;0x20;0x30;0x40;0x50;0x60;0x70;0x80",
)),
),
(
Lsn(0x90),
Value::WalRecord(NeonWalRecord::wal_append(";0x90")),
),
]),
};
assert_eq!(res, expected_res);
// TODO: more tests with mixed image + delta, adding with k-merge test cases; e2e compaction test
Ok(())
}

View File

@@ -111,7 +111,7 @@ impl TryFrom<&TimelineMetadataBodyV2> for TimelineMetadataHeader {
#[error("re-serializing for crc32 failed")]
struct Crc32CalculationFailed(#[source] utils::bin_ser::SerializeError);
const METADATA_HDR_SIZE: usize = size_of::<TimelineMetadataHeader>();
const METADATA_HDR_SIZE: usize = std::mem::size_of::<TimelineMetadataHeader>();
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataBodyV2 {

View File

@@ -1407,7 +1407,6 @@ impl TenantManager {
tracing::info!("Remote storage already deleted");
} else {
tracing::info!("Deleting {} keys from remote storage", keys.len());
let keys = keys.into_iter().map(|o| o.key).collect::<Vec<_>>();
self.resources
.remote_storage
.delete_objects(&keys, &self.cancel)

View File

@@ -1380,13 +1380,12 @@ impl RemoteTimelineClient {
// marker via its deleted_at attribute
let latest_index = remaining
.iter()
.filter(|o| {
o.key
.object_name()
.filter(|p| {
p.object_name()
.map(|n| n.starts_with(IndexPart::FILE_NAME))
.unwrap_or(false)
})
.filter_map(|o| parse_remote_index_path(o.key.clone()).map(|gen| (o.key.clone(), gen)))
.filter_map(|path| parse_remote_index_path(path.clone()).map(|gen| (path, gen)))
.max_by_key(|i| i.1)
.map(|i| i.0.clone())
.unwrap_or(
@@ -1397,12 +1396,14 @@ impl RemoteTimelineClient {
let remaining_layers: Vec<RemotePath> = remaining
.into_iter()
.filter_map(|o| {
if o.key == latest_index || o.key.object_name() == Some(INITDB_PRESERVED_PATH) {
None
} else {
Some(o.key)
.filter(|p| {
if p == &latest_index {
return false;
}
if p.object_name() == Some(INITDB_PRESERVED_PATH) {
return false;
}
true
})
.inspect(|path| {
if let Some(name) = path.object_name() {

View File

@@ -295,11 +295,10 @@ where
};
}
for object in listing.keys {
let object_name = object
.key
for key in listing.keys {
let object_name = key
.object_name()
.ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
.ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
other_prefixes.insert(object_name.to_string());
}
@@ -460,7 +459,7 @@ pub(crate) async fn download_index_part(
// is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
let max_previous_generation = indices
.into_iter()
.filter_map(|o| parse_remote_index_path(o.key))
.filter_map(parse_remote_index_path)
.filter(|g| g <= &my_generation)
.max();

View File

@@ -828,9 +828,9 @@ async fn eviction_cancellation_on_drop() {
#[test]
#[cfg(target_arch = "x86_64")]
fn layer_size() {
assert_eq!(size_of::<LayerAccessStats>(), 8);
assert_eq!(size_of::<PersistentLayerDesc>(), 104);
assert_eq!(size_of::<LayerInner>(), 312);
assert_eq!(std::mem::size_of::<LayerAccessStats>(), 8);
assert_eq!(std::mem::size_of::<PersistentLayerDesc>(), 104);
assert_eq!(std::mem::size_of::<LayerInner>(), 312);
// it also has the utf8 path
}

View File

@@ -204,11 +204,9 @@ impl<'a> IteratorWrapper<'a> {
/// A merge iterator over delta/image layer iterators. When duplicated records are
/// found, the iterator will not perform any deduplication, and the caller should handle
/// these situation. By saying duplicated records, there are many possibilities:
///
/// * Two same delta at the same LSN.
/// * Two same image at the same LSN.
/// * Delta/image at the same LSN where the image has already applied the delta.
///
/// The iterator will always put the image before the delta.
pub struct MergeIterator<'a> {
heap: BinaryHeap<IteratorWrapper<'a>>,

View File

@@ -137,7 +137,7 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::{config::TenantConf, upload_queue::NotInitialized};
use super::config::TenantConf;
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
@@ -642,13 +642,7 @@ impl FlushLayerError {
// When crossing from generic anyhow errors to this error type, we explicitly check
// for timeline cancellation to avoid logging inoffensive shutdown errors as warn/err.
fn from_anyhow(timeline: &Timeline, err: anyhow::Error) -> Self {
let cancelled = timeline.cancel.is_cancelled()
// The upload queue might have been shut down before the official cancellation of the timeline.
|| err
.downcast_ref::<NotInitialized>()
.map(NotInitialized::is_stopping)
.unwrap_or_default();
if cancelled {
if timeline.cancel.is_cancelled() {
Self::Cancelled
} else {
Self::Other(Arc::new(err))
@@ -3421,6 +3415,7 @@ impl Timeline {
}
}
#[allow(unknown_lints)] // doc_lazy_continuation is still a new lint
#[allow(clippy::doc_lazy_continuation)]
/// Get the data needed to reconstruct all keys in the provided keyspace
///

View File

@@ -1122,10 +1122,9 @@ impl Timeline {
);
}
}
// There was an assertion for no base image that checks if the first
// record in the history is `will_init` before, but it was removed.
// This is explained in the test cases for generate_key_retention.
// Search "incomplete history" for more information.
if let Value::WalRecord(rec) = &history[0].2 {
assert!(rec.will_init(), "no base image");
}
for lsn in retain_lsn_below_horizon {
assert!(lsn < &horizon, "retain lsn must be below horizon")
}
@@ -1201,6 +1200,9 @@ impl Timeline {
false
};
replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
if let Some((_, _, val)) = replay_history.first() {
assert!(val.will_init(), "invalid history, no base image");
}
// Only retain the items after the last image record
for idx in (0..replay_history.len()).rev() {
if replay_history[idx].2.will_init() {
@@ -1208,9 +1210,6 @@ impl Timeline {
break;
}
}
if let Some((_, _, val)) = replay_history.first() {
assert!(val.will_init(), "invalid history, no base image");
}
if generate_image && records_since_last_image > 0 {
records_since_last_image = 0;
let history = std::mem::take(&mut replay_history);

View File

@@ -618,7 +618,7 @@ impl WalIngest {
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
0
} else {
size_of::<u16>() * xlrec.ntuples as usize
std::mem::size_of::<u16>() * xlrec.ntuples as usize
};
assert_eq!(offset_array_len, buf.remaining());
@@ -685,7 +685,7 @@ impl WalIngest {
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
0
} else {
size_of::<u16>() * xlrec.ntuples as usize
std::mem::size_of::<u16>() * xlrec.ntuples as usize
};
assert_eq!(offset_array_len, buf.remaining());
@@ -752,7 +752,7 @@ impl WalIngest {
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
0
} else {
size_of::<u16>() * xlrec.ntuples as usize
std::mem::size_of::<u16>() * xlrec.ntuples as usize
};
assert_eq!(offset_array_len, buf.remaining());
@@ -920,7 +920,7 @@ impl WalIngest {
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
0
} else {
size_of::<u16>() * xlrec.ntuples as usize
std::mem::size_of::<u16>() * xlrec.ntuples as usize
};
assert_eq!(offset_array_len, buf.remaining());

View File

@@ -1,7 +1,7 @@
//! Man-in-the-middle tests
//!
//! Channel binding should prevent a proxy server
//! *that has access to create valid certificates*
//! - that has access to create valid certificates -
//! from controlling the TLS connection.
use std::fmt::Debug;

View File

@@ -158,7 +158,7 @@ mod tests {
let N = 1021 * 4096;
let sketch = CountMinSketch::with_params(p / N as f64, 1.0 - q);
let memory = size_of::<u32>() * sketch.buckets.len();
let memory = std::mem::size_of::<u32>() * sketch.buckets.len();
let time = sketch.depth;
(memory, time)
}

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.80.0"
channel = "1.79.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -27,7 +27,7 @@ pub const SK_FORMAT_VERSION: u32 = 9;
pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
// needed to atomically update the state using `rename`
const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
pub const CHECKSUM_SIZE: usize = size_of::<u32>();
pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
/// Storage should keep actual state inside of it. It should implement Deref
/// trait to access state fields and have persist method for updating that state.

View File

@@ -545,10 +545,7 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
&cancel,
)
.await?
.keys
.into_iter()
.map(|o| o.key)
.collect::<Vec<_>>();
.keys;
if files.is_empty() {
return Ok(()); // done
}
@@ -616,7 +613,7 @@ pub async fn copy_s3_segments(
let uploaded_segments = &files
.iter()
.filter_map(|o| o.key.object_name().map(ToOwned::to_owned))
.filter_map(|file| file.object_name().map(ToOwned::to_owned))
.collect::<HashSet<_>>();
debug!(

View File

@@ -172,7 +172,7 @@ fn write_walrecord_to_disk(
let mut freespace = insert_freespace(curr_ptr);
let mut written: usize = 0;
assert!(freespace >= size_of::<u32>());
assert!(freespace >= std::mem::size_of::<u32>());
for mut rdata in rdatas {
while rdata.len() >= freespace {

View File

@@ -278,7 +278,7 @@ pub struct Service {
config: Config,
persistence: Arc<Persistence>,
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
heartbeater: Heartbeater,
@@ -308,15 +308,9 @@ pub struct Service {
// Process shutdown will fire this token
cancel: CancellationToken,
// Child token of [`Service::cancel`] used by reconcilers
reconcilers_cancel: CancellationToken,
// Background tasks will hold this gate
gate: Gate,
// Reconcilers background tasks will hold this gate
reconcilers_gate: Gate,
/// This waits for initial reconciliation with pageservers to complete. Until this barrier
/// passes, it isn't safe to do any actions that mutate tenants.
pub(crate) startup_complete: Barrier,
@@ -403,11 +397,6 @@ struct ShardUpdate {
generation: Option<Generation>,
}
pub(crate) enum ReconcileResultRequest {
ReconcileResult(ReconcileResult),
Stop,
}
impl Service {
pub fn get_config(&self) -> &Config {
&self.config
@@ -764,7 +753,7 @@ impl Service {
const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20);
let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
while !self.reconcilers_cancel.is_cancelled() {
while !self.cancel.is_cancelled() {
tokio::select! {
_ = interval.tick() => {
let reconciles_spawned = self.reconcile_all();
@@ -777,7 +766,7 @@ impl Service {
}
}
}
_ = self.reconcilers_cancel.cancelled() => return
_ = self.cancel.cancelled() => return
}
}
}
@@ -948,7 +937,7 @@ impl Service {
async fn process_results(
&self,
mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResultRequest>,
mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResult>,
mut bg_compute_hook_result_rx: tokio::sync::mpsc::Receiver<
Result<(), (TenantShardId, NotifyError)>,
>,
@@ -958,8 +947,8 @@ impl Service {
tokio::select! {
r = result_rx.recv() => {
match r {
Some(ReconcileResultRequest::ReconcileResult(result)) => {self.process_result(result);},
None | Some(ReconcileResultRequest::Stop) => {break;}
Some(result) => {self.process_result(result);},
None => {break;}
}
}
_ = async{
@@ -985,6 +974,9 @@ impl Service {
}
};
}
// We should only fall through on shutdown
assert!(self.cancel.is_cancelled());
}
async fn process_aborts(
@@ -1161,8 +1153,6 @@ impl Service {
tokio::sync::mpsc::channel(MAX_DELAYED_RECONCILES);
let cancel = CancellationToken::new();
let reconcilers_cancel = cancel.child_token();
let heartbeater = Heartbeater::new(
config.jwt_token.clone(),
config.max_offline_interval,
@@ -1188,9 +1178,7 @@ impl Service {
abort_tx,
startup_complete: startup_complete.clone(),
cancel,
reconcilers_cancel,
gate: Gate::default(),
reconcilers_gate: Gate::default(),
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
});
@@ -5144,7 +5132,7 @@ impl Service {
}
};
let Ok(gate_guard) = self.reconcilers_gate.enter() else {
let Ok(gate_guard) = self.gate.enter() else {
// Gate closed: we're shutting down, drop out.
return None;
};
@@ -5157,7 +5145,7 @@ impl Service {
&self.persistence,
units,
gate_guard,
&self.reconcilers_cancel,
&self.cancel,
)
}
@@ -5604,21 +5592,17 @@ impl Service {
}
pub async fn shutdown(&self) {
// Cancel all on-going reconciles and wait for them to exit the gate.
tracing::info!("Shutting down: cancelling and waiting for in-flight reconciles");
self.reconcilers_cancel.cancel();
self.reconcilers_gate.close().await;
// Signal the background loop in [`Service::process_results`] to exit once
// it has proccessed the results from all the reconciles we cancelled earlier.
tracing::info!("Shutting down: processing results from previously in-flight reconciles");
self.result_tx.send(ReconcileResultRequest::Stop).ok();
self.result_tx.closed().await;
// Background tasks hold gate guards: this notifies them of the cancellation and
// waits for them all to complete.
tracing::info!("Shutting down: cancelling and waiting for background tasks to exit");
// Note that this already stops processing any results from reconciles: so
// we do not expect that our [`TenantShard`] objects will reach a neat
// final state.
self.cancel.cancel();
// The cancellation tokens in [`crate::reconciler::Reconciler`] are children
// of our cancellation token, so we do not need to explicitly cancel each of
// them.
// Background tasks and reconcilers hold gate guards: this waits for them all
// to complete.
self.gate.close().await;
}

View File

@@ -9,7 +9,6 @@ use crate::{
persistence::TenantShardPersistence,
reconciler::ReconcileUnits,
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
service::ReconcileResultRequest,
};
use pageserver_api::controller_api::{
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
@@ -1060,7 +1059,7 @@ impl TenantShard {
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn spawn_reconciler(
&mut self,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
pageservers: &Arc<HashMap<NodeId, Node>>,
compute_hook: &Arc<ComputeHook>,
service_config: &service::Config,
@@ -1184,9 +1183,7 @@ impl TenantShard {
pending_compute_notification: reconciler.compute_notify_failure,
};
result_tx
.send(ReconcileResultRequest::ReconcileResult(result))
.ok();
result_tx.send(result).ok();
}
.instrument(reconciler_span),
);

View File

@@ -10,7 +10,7 @@ use std::{
use anyhow::Context;
use futures_util::TryStreamExt;
use pageserver_api::shard::TenantShardId;
use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
@@ -324,7 +324,7 @@ impl std::fmt::Display for PurgeMode {
pub async fn get_tenant_objects(
s3_client: &GenericRemoteStorage,
tenant_shard_id: TenantShardId,
) -> anyhow::Result<Vec<ListingObject>> {
) -> anyhow::Result<Vec<RemotePath>> {
tracing::debug!("Listing objects in tenant {tenant_shard_id}");
let tenant_root = super::remote_tenant_path(&tenant_shard_id);
@@ -345,7 +345,7 @@ pub async fn get_tenant_objects(
pub async fn get_timeline_objects(
s3_client: &GenericRemoteStorage,
ttid: TenantShardTimelineId,
) -> anyhow::Result<Vec<ListingObject>> {
) -> anyhow::Result<Vec<RemotePath>> {
tracing::debug!("Listing objects in timeline {ttid}");
let timeline_root = super::remote_timeline_path_id(&ttid);
@@ -372,7 +372,7 @@ const MAX_KEYS_PER_DELETE: usize = 1000;
/// `num_deleted` returns number of deleted keys.
async fn do_delete(
remote_client: &GenericRemoteStorage,
keys: &mut Vec<ListingObject>,
keys: &mut Vec<RemotePath>,
dry_run: bool,
drain: bool,
progress_tracker: &mut DeletionProgressTracker,
@@ -382,8 +382,6 @@ async fn do_delete(
let request_keys =
keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
let request_keys: Vec<RemotePath> = request_keys.into_iter().map(|o| o.key).collect();
let num_deleted = request_keys.len();
if dry_run {
tracing::info!("Dry-run deletion of objects: ");

View File

@@ -17,6 +17,7 @@ from fixtures.pg_version import PgVersion
# 3. Disk space used
# 4. Peak memory usage
#
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/7124")
def test_bulk_insert(neon_with_baseline: PgCompare):
env = neon_with_baseline

View File

@@ -591,7 +591,7 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
abs_diff = abs(ratio - expected_ratio)
assert original_count > count_now
expectation = 0.065
expectation = 0.06
log.info(
f"tenant {tenant_id} layer count {original_count} -> {count_now}, ratio: {ratio}, expecting {abs_diff} < {expectation}"
)

View File

@@ -198,8 +198,8 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# Disable automatic creation of image layers, as we will create them explicitly when we want them
"image_creation_threshold": 9999,
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": 1,
"image_layer_creation_check_threshold": 0,
}
@@ -225,7 +225,7 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
# Do a full image layer generation before splitting, so that when we compact after splitting
# we should only see sizes decrease (from post-split drops/rewrites), not increase (from image layer generation)
env.get_tenant_pageserver(tenant_id).http_client().timeline_checkpoint(
env.get_tenant_pageserver(tenant_id).http_client().timeline_compact(
tenant_id, timeline_id, force_image_layer_creation=True, wait_until_uploaded=True
)

View File

@@ -191,9 +191,7 @@ def test_scrubber_physical_gc_ancestors(
"checkpoint_distance": f"{1024 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{1024 * 1024}",
# Disable automatic creation of image layers, as future image layers can result in layers in S3 that
# aren't referenced by children, earlier than the test expects such layers to exist
"image_creation_threshold": "9999",
"image_creation_threshold": "2",
"image_layer_creation_check_threshold": "0",
# Disable background compaction, we will do it explicitly
"compaction_period": "0s",
@@ -243,7 +241,7 @@ def test_scrubber_physical_gc_ancestors(
workload.churn_rows(100)
for shard in shards:
ps = env.get_tenant_pageserver(shard)
ps.http_client().timeline_compact(shard, timeline_id, force_image_layer_creation=True)
ps.http_client().timeline_compact(shard, timeline_id)
ps.http_client().timeline_gc(shard, timeline_id, 0)
# We will use a min_age_secs=1 threshold for deletion, let it pass