Compare commits

..

8 Commits

Author SHA1 Message Date
Vlad Lazar
053235cf57 wip
wip

clippy

wip

test

wip
2024-06-11 12:09:37 +01:00
a-masterov
e27ce38619 Add testing for extensions (#7818)
## Problem

We need automated tests of extensions shipped with Neon to detect
possible problems.

## Summary of changes

A new image neon-test-extensions is added. Workflow changes to test the
shipped extensions are added as well.
Currently, the regression tests, shipped with extensions are in use.
Some extensions, i.e. rum, timescaledb, rdkit, postgis, pgx_ulid, pgtap,
pg_tiktoken, pg_jsonschema, pg_graphql, kq_imcx, wal2json_2_5 are
excluded due to problems or absence of internal tests.

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2024-06-11 13:07:51 +02:00
Joonas Koivunen
e46692788e refactor: Timeline layer flushing (#7993)
The new features have deteriorated layer flushing, most recently with
#7927. Changes:

- inline `Timeline::freeze_inmem_layer` to the only caller
- carry the TimelineWriterState guard to the actual point of freezing
the layer
- this allows us to `#[cfg(feature = "testing")]` the assertion added in
#7927
- remove duplicate `flush_frozen_layer` in favor of splitting the
`flush_frozen_layers_and_wait`
- this requires starting the flush loop earlier for `checkpoint_distance
< initdb size` tests
2024-06-10 19:34:34 +03:00
Alex Chi Z
a8ca7a1a1d docs: highlight neon env comes with an initial timeline (#7995)
Quite a few existing test cases create their own timelines instead of
using the default one. This pull request highlights that and hopefully
people can write simpler tests in the future.

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Yuchen Liang <70461588+yliang412@users.noreply.github.com>
2024-06-10 12:08:16 -04:00
Joonas Koivunen
b52e31c1a4 fix: allow layer flushes more often (#7927)
As seen with the pgvector 0.7.0 index builds, we can receive large
batches of images, leading to very large L0 layers in the range of 1GB.
These large layers are produced because we are only able to roll the
layer after we have witnessed two different Lsns in a single
`DataDirModification::commit`. As the single Lsn batches of images can
span over multiple `DataDirModification` lifespans, we will rarely get
to write two different Lsns in a single `put_batch` currently.

The solution is to remember the TimelineWriterState instead of eagerly
forgetting it until we really open the next layer or someone else
flushes (while holding the write_guard).

Additional changes are test fixes to avoid "initdb image layer
optimization" or ignoring initdb layers for assertion.

Cc: #7197 because small `checkpoint_distance` will now trigger the
"initdb image layer optimization"
2024-06-10 13:50:17 +00:00
Heikki Linnakangas
5a7e285c2c Simplify scanning compute logs in tests (#7997)
Implement LogUtils in the Endpoint fixture class, so that the
"log_contains" function can be used on compute logs too.

Per discussion at:
https://github.com/neondatabase/neon/pull/7288#discussion_r1623633803
2024-06-10 12:52:49 +00:00
Christian Schwarz
ae5badd375 Revert "Include openssl and ICU statically linked" (#8003)
Reverts neondatabase/neon#7956

Rationale: compute incompatibilties

Slack thread:
https://neondb.slack.com/archives/C033RQ5SPDH/p1718011276665839?thread_ts=1718008160.431869&cid=C033RQ5SPDH

Relevant quotes from @hlinnaka 

> If we go through with the current release candidate, but the compute
is pinned, people who create new projects will get that warning, which
is silly. To them, it looks like the ICU version was downgraded, because
initdb was run with newer version.

> We should upgrade the ICU version eventually. And when we do that,
users with old projects that use ICU will start to see that warning. I
think that's acceptable, as long as we do homework, notify users, and
communicate that properly.
> When do that, we should to try to upgrade the storage and compute
versions at roughly the same time.
2024-06-10 13:20:20 +02:00
Alex Chi Z
3e63d0f9e0 test(pageserver): quantify compaction outcome (#7867)
A simple API to collect some statistics after compaction to easily
understand the result.

The tool reads the layer map, and analyze range by range instead of
doing single-key operations, which is more efficient than doing a
benchmark to collect the result. It currently computes two key metrics:

* Latest data access efficiency, which finds how many delta layers /
image layers the system needs to iterate before returning any key in a
key range.
* (Approximate) PiTR efficiency, as in
https://github.com/neondatabase/neon/issues/7770, which is simply the
number of delta files in the range. The reason behind that is, assume no
image layer is created, PiTR efficiency is simply the cost of collect
records from the delta layers, and the replay time. Number of delta
files (or in the future, estimated size of reads) is a simple yet
efficient way of estimating how much effort the page server needs to
reconstruct a page.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-06-10 10:42:13 +02:00
42 changed files with 1947 additions and 207 deletions

View File

@@ -8,6 +8,7 @@
!scripts/combine_control_files.py
!scripts/ninstall.sh
!vm-cgconfig.conf
!docker-compose/run-tests.sh
# Directories
!.cargo/

View File

@@ -858,6 +858,26 @@ jobs:
cache-to: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }},mode=max
tags: |
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
- name: Build neon extensions test image
if: matrix.version == 'v16'
uses: docker/build-push-action@v5
with:
context: .
build-args: |
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
provenance: false
push: true
pull: true
file: Dockerfile.compute-node
target: neon-pg-ext-test
cache-from: type=registry,ref=neondatabase/neon-test-extensions-${{ matrix.version }}:cache-${{ matrix.arch }}
cache-to: type=registry,ref=neondatabase/neon-test-extensions-${{ matrix.version }}:cache-${{ matrix.arch }},mode=max
tags: |
neondatabase/neon-test-extensions-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }}
- name: Build compute-tools image
# compute-tools are Postgres independent, so build it only once
@@ -902,6 +922,13 @@ jobs:
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-x64 \
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-arm64
- name: Create multi-arch neon-test-extensions image
if: matrix.version == 'v16'
run: |
docker buildx imagetools create -t neondatabase/neon-test-extensions-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }} \
neondatabase/neon-test-extensions-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-x64 \
neondatabase/neon-test-extensions-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-arm64
- name: Create multi-arch compute-tools image
if: matrix.version == 'v16'
run: |
@@ -1020,7 +1047,7 @@ jobs:
exit 1
fi
- name: Verify docker-compose example
- name: Verify docker-compose example and test extensions
timeout-minutes: 20
run: env TAG=${{needs.tag.outputs.build-tag}} ./docker-compose/docker_compose_test.sh

1
Cargo.lock generated
View File

@@ -5801,6 +5801,7 @@ dependencies = [
"r2d2",
"reqwest 0.12.4",
"routerify",
"scopeguard",
"serde",
"serde_json",
"strum",

View File

@@ -928,6 +928,69 @@ RUN rm -r /usr/local/pgsql/include
# if they were to be used by other libraries.
RUN rm /usr/local/pgsql/lib/lib*.a
#########################################################################################
#
# Layer neon-pg-ext-test
#
#########################################################################################
FROM neon-pg-ext-build AS neon-pg-ext-test
ARG PG_VERSION
RUN mkdir /ext-src
#COPY --from=postgis-build /postgis.tar.gz /ext-src/
#COPY --from=postgis-build /sfcgal/* /usr
COPY --from=plv8-build /plv8.tar.gz /ext-src/
COPY --from=h3-pg-build /h3-pg.tar.gz /ext-src/
COPY --from=unit-pg-build /postgresql-unit.tar.gz /ext-src/
COPY --from=vector-pg-build /pgvector.tar.gz /ext-src/
COPY --from=vector-pg-build /pgvector.patch /ext-src/
COPY --from=pgjwt-pg-build /pgjwt.tar.gz /ext-src
#COPY --from=pg-jsonschema-pg-build /home/nonroot/pg_jsonschema.tar.gz /ext-src
#COPY --from=pg-graphql-pg-build /home/nonroot/pg_graphql.tar.gz /ext-src
#COPY --from=pg-tiktoken-pg-build /home/nonroot/pg_tiktoken.tar.gz /ext-src
COPY --from=hypopg-pg-build /hypopg.tar.gz /ext-src
COPY --from=pg-hashids-pg-build /pg_hashids.tar.gz /ext-src
#COPY --from=rum-pg-build /rum.tar.gz /ext-src
#COPY --from=pgtap-pg-build /pgtap.tar.gz /ext-src
COPY --from=ip4r-pg-build /ip4r.tar.gz /ext-src
COPY --from=prefix-pg-build /prefix.tar.gz /ext-src
COPY --from=hll-pg-build /hll.tar.gz /ext-src
COPY --from=plpgsql-check-pg-build /plpgsql_check.tar.gz /ext-src
#COPY --from=timescaledb-pg-build /timescaledb.tar.gz /ext-src
COPY --from=pg-hint-plan-pg-build /pg_hint_plan.tar.gz /ext-src
COPY patches/pg_hintplan.patch /ext-src
#COPY --from=kq-imcx-pg-build /kq_imcx.tar.gz /ext-src
COPY --from=pg-cron-pg-build /pg_cron.tar.gz /ext-src
COPY patches/pg_cron.patch /ext-src
#COPY --from=pg-pgx-ulid-build /home/nonroot/pgx_ulid.tar.gz /ext-src
COPY --from=rdkit-pg-build /rdkit.tar.gz /ext-src
COPY --from=pg-uuidv7-pg-build /pg_uuidv7.tar.gz /ext-src
COPY --from=pg-roaringbitmap-pg-build /pg_roaringbitmap.tar.gz /ext-src
COPY --from=pg-semver-pg-build /pg_semver.tar.gz /ext-src
#COPY --from=pg-embedding-pg-build /home/nonroot/pg_embedding-src/ /ext-src
#COPY --from=wal2json-pg-build /wal2json_2_5.tar.gz /ext-src
COPY --from=pg-anon-pg-build /pg_anon.tar.gz /ext-src
COPY patches/pg_anon.patch /ext-src
COPY --from=pg-ivm-build /pg_ivm.tar.gz /ext-src
COPY --from=pg-partman-build /pg_partman.tar.gz /ext-src
RUN cd /ext-src/ && for f in *.tar.gz; \
do echo $f; dname=$(echo $f | sed 's/\.tar.*//')-src; \
rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \
|| exit 1; rm -f $f; done
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
# cmake is required for the h3 test
RUN apt-get update && apt-get install -y cmake
RUN patch -p1 < /ext-src/pg_hintplan.patch
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN patch -p1 </ext-src/pg_anon.patch
RUN patch -p1 </ext-src/pg_cron.patch
ENV PATH=/usr/local/pgsql/bin:$PATH
ENV PGHOST=compute
ENV PGPORT=55433
ENV PGUSER=cloud_admin
ENV PGDATABASE=postgres
#########################################################################################
#
# Final layer

View File

@@ -8,6 +8,11 @@ USER root
RUN apt-get update && \
apt-get install -y curl \
jq \
python3-pip \
netcat
#Faker is required for the pg_anon test
RUN pip3 install Faker
#This is required for the pg_hintplan test
RUN mkdir -p /ext-src/pg_hint_plan-src && chown postgres /ext-src/pg_hint_plan-src
USER postgres
USER postgres

View File

@@ -95,7 +95,7 @@
},
{
"name": "shared_preload_libraries",
"value": "neon",
"value": "neon,pg_cron,timescaledb,pg_stat_statements",
"vartype": "string"
},
{
@@ -127,6 +127,16 @@
"name": "max_replication_flush_lag",
"value": "10GB",
"vartype": "string"
},
{
"name": "cron.database",
"value": "postgres",
"vartype": "string"
},
{
"name": "session_preload_libraries",
"value": "anon",
"vartype": "string"
}
]
},

View File

@@ -1,5 +1,3 @@
version: '3'
services:
minio:
restart: always
@@ -194,3 +192,13 @@ services:
done"
depends_on:
- compute
neon-test-extensions:
image: ${REPOSITORY:-neondatabase}/neon-test-extensions-v${PG_TEST_VERSION:-16}:${TAG:-latest}
entrypoint:
- "/bin/bash"
- "-c"
command:
- sleep 1800
depends_on:
- compute

View File

@@ -7,15 +7,21 @@
# Implicitly accepts `REPOSITORY` and `TAG` env vars that are passed into the compose file
# Their defaults point at DockerHub `neondatabase/neon:latest` image.`,
# to verify custom image builds (e.g pre-published ones).
#
# A test script for postgres extensions
# Currently supports only v16
#
set -eux -o pipefail
SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
COMPOSE_FILE=$SCRIPT_DIR/docker-compose.yml
COMPOSE_FILE='docker-compose.yml'
cd $(dirname $0)
docker compose -f $COMPOSE_FILE
COMPUTE_CONTAINER_NAME=docker-compose-compute-1
SQL="CREATE TABLE t(key int primary key, value text); insert into t values(1,1); select * from t;"
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -c '$SQL' postgres"
TEST_CONTAINER_NAME=docker-compose-neon-test-extensions-1
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -d postgres"
: ${http_proxy:=}
: ${https_proxy:=}
export http_proxy https_proxy
cleanup() {
echo "show container information"
@@ -25,34 +31,71 @@ cleanup() {
docker compose -f $COMPOSE_FILE down
}
echo "clean up containers if exists"
cleanup
for pg_version in 14 15 16; do
echo "start containers (pg_version=$pg_version)."
PG_VERSION=$pg_version docker compose -f $COMPOSE_FILE up --build -d
echo "clean up containers if exists"
cleanup
PG_TEST_VERSION=$(($pg_version < 16 ? 16 : $pg_version))
PG_VERSION=$pg_version PG_TEST_VERSION=$PG_TEST_VERSION docker compose -f $COMPOSE_FILE up --build -d
echo "wait until the compute is ready. timeout after 60s. "
cnt=0
while sleep 1; do
while sleep 3; do
# check timeout
cnt=`expr $cnt + 1`
cnt=`expr $cnt + 3`
if [ $cnt -gt 60 ]; then
echo "timeout before the compute is ready."
cleanup
exit 1
fi
# check if the compute is ready
set +o pipefail
result=`docker compose -f $COMPOSE_FILE logs "compute_is_ready" | grep "accepting connections" | wc -l`
set -o pipefail
if [ $result -eq 1 ]; then
if docker compose -f $COMPOSE_FILE logs "compute_is_ready" | grep -q "accepting connections"; then
echo "OK. The compute is ready to connect."
echo "execute simple queries."
docker exec $COMPUTE_CONTAINER_NAME /bin/bash -c "psql $PSQL_OPTION"
cleanup
break
fi
done
if [ $pg_version -ge 16 ]
then
echo Enabling trust connection
docker exec $COMPUTE_CONTAINER_NAME bash -c "sed -i '\$d' /var/db/postgres/compute/pg_hba.conf && echo -e 'host\t all\t all\t all\t trust' >> /var/db/postgres/compute/pg_hba.conf && psql $PSQL_OPTION -c 'select pg_reload_conf()' "
echo Adding postgres role
docker exec $COMPUTE_CONTAINER_NAME psql $PSQL_OPTION -c "CREATE ROLE postgres SUPERUSER LOGIN"
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config
docker exec $COMPUTE_CONTAINER_NAME touch /var/db/postgres/compute/compute_ctl_temp_override.conf
# This block is required for the pg_anon extension test.
# The test assumes that it is running on the same host with the postgres engine.
# In our case it's not true, that's why we are copying files to the compute node
TMPDIR=$(mktemp -d)
docker cp $TEST_CONTAINER_NAME:/ext-src/pg_anon-src/data $TMPDIR/data
echo -e '1\t too \t many \t tabs' > $TMPDIR/data/bad.csv
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/tmp/tmp_anon_alternate_data
rm -rf $TMPDIR
TMPDIR=$(mktemp -d)
# The following block does the same for the pg_hintplan test
docker cp $TEST_CONTAINER_NAME:/ext-src/pg_hint_plan-src/data $TMPDIR/data
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/ext-src/pg_hint_plan-src/
rm -rf $TMPDIR
# We are running tests now
if docker exec -e SKIP=rum-src,timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pgtap-src,pg_tiktoken-src,pg_jsonschema-src,pg_graphql-src,kq_imcx-src,wal2json_2_5-src \
$TEST_CONTAINER_NAME /run-tests.sh | tee testout.txt
then
cleanup
else
FAILED=$(tail -1 testout.txt)
for d in $FAILED
do
mkdir $d
docker cp $TEST_CONTAINER_NAME:/ext-src/$d/regression.diffs $d || true
docker cp $TEST_CONTAINER_NAME:/ext-src/$d/regression.out $d || true
cat $d/regression.out $d/regression.diffs || true
done
rm -rf $FAILED
cleanup
exit 1
fi
fi
cleanup
done

View File

@@ -0,0 +1,15 @@
#!/bin/bash
set -x
cd /ext-src
FAILED=
LIST=$((echo ${SKIP} | sed 's/,/\n/g'; ls -d *-src) | sort | uniq -u)
for d in ${LIST}
do
[ -d ${d} ] || continue
psql -c "select 1" >/dev/null || break
make -C ${d} installcheck || FAILED="${d} ${FAILED}"
done
[ -z "${FAILED}" ] && exit 0
echo ${FAILED}
exit 1

View File

@@ -209,6 +209,7 @@ pub enum NodeSchedulingPolicy {
Active,
Filling,
Pause,
PauseForRestart,
Draining,
}
@@ -220,6 +221,7 @@ impl FromStr for NodeSchedulingPolicy {
"active" => Ok(Self::Active),
"filling" => Ok(Self::Filling),
"pause" => Ok(Self::Pause),
"pause_for_restart" => Ok(Self::PauseForRestart),
"draining" => Ok(Self::Draining),
_ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
}
@@ -233,6 +235,7 @@ impl From<NodeSchedulingPolicy> for String {
Active => "active",
Filling => "filling",
Pause => "pause",
PauseForRestart => "pause_for_restart",
Draining => "draining",
}
.to_string()

View File

@@ -2429,6 +2429,25 @@ async fn list_aux_files(
json_response(StatusCode::OK, files)
}
async fn perf_info(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let result = timeline.perf_info().await;
json_response(StatusCode::OK, result)
}
async fn ingest_aux_files(
mut request: Request<Body>,
_cancel: CancellationToken,
@@ -2856,5 +2875,9 @@ pub fn make_router(
|r| testing_api_handler("list_aux_files", r, list_aux_files),
)
.post("/v1/top_tenants", |r| api_handler(r, post_top_tenants))
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/perf_info",
|r| testing_api_handler("perf_info", r, perf_info),
)
.any(handler_404))
}

View File

@@ -3395,6 +3395,12 @@ impl Tenant {
let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;
// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
unfinished_timeline.maybe_spawn_flush_loop();
import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
&pgdata_path,
@@ -3406,12 +3412,6 @@ impl Tenant {
format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
})?;
// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
unfinished_timeline.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
anyhow::bail!("failpoint before-checkpoint-new-timeline");
});

View File

@@ -52,7 +52,7 @@ pub struct InMemoryLayer {
/// Frozen layers have an exclusive end LSN.
/// Writes are only allowed when this is `None`.
end_lsn: OnceLock<Lsn>,
pub(crate) end_lsn: OnceLock<Lsn>,
/// Used for traversal path. Cached representation of the in-memory layer before frozen.
local_path_str: Arc<str>,

View File

@@ -1,3 +1,4 @@
pub(crate) mod analysis;
mod compaction;
pub mod delete;
pub(crate) mod detach_ancestor;
@@ -321,6 +322,8 @@ pub struct Timeline {
/// Locked automatically by [`TimelineWriter`] and checkpointer.
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
///
/// The state is cleared upon freezing.
write_lock: tokio::sync::Mutex<Option<TimelineWriterState>>,
/// Used to avoid multiple `flush_loop` tasks running
@@ -1568,7 +1571,15 @@ impl Timeline {
// This exists to provide a non-span creating version of `freeze_and_flush` we can call without
// polluting the span hierarchy.
pub(crate) async fn freeze_and_flush0(&self) -> Result<(), FlushLayerError> {
let to_lsn = self.freeze_inmem_layer(false).await;
let to_lsn = {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let mut g = self.write_lock.lock().await;
let to_lsn = self.get_last_record_lsn();
self.freeze_inmem_layer_at(to_lsn, &mut g).await;
to_lsn
};
self.flush_frozen_layers_and_wait(to_lsn).await
}
@@ -1577,7 +1588,7 @@ impl Timeline {
// an ephemeral layer open forever when idle. It also freezes layers if the global limit on
// ephemeral layer bytes has been breached.
pub(super) async fn maybe_freeze_ephemeral_layer(&self) {
let Ok(_write_guard) = self.write_lock.try_lock() else {
let Ok(mut write_guard) = self.write_lock.try_lock() else {
// If the write lock is held, there is an active wal receiver: rolling open layers
// is their responsibility while they hold this lock.
return;
@@ -1654,24 +1665,35 @@ impl Timeline {
self.last_freeze_at.load(),
open_layer.get_opened_at(),
) {
match open_layer.info() {
let at_lsn = match open_layer.info() {
InMemoryLayerInfo::Frozen { lsn_start, lsn_end } => {
// We may reach this point if the layer was already frozen by not yet flushed: flushing
// happens asynchronously in the background.
tracing::debug!(
"Not freezing open layer, it's already frozen ({lsn_start}..{lsn_end})"
);
None
}
InMemoryLayerInfo::Open { .. } => {
// Upgrade to a write lock and freeze the layer
drop(layers_guard);
let mut layers_guard = self.layers.write().await;
layers_guard
.try_freeze_in_memory_layer(current_lsn, &self.last_freeze_at)
let froze = layers_guard
.try_freeze_in_memory_layer(
current_lsn,
&self.last_freeze_at,
&mut write_guard,
)
.await;
Some(current_lsn).filter(|_| froze)
}
};
if let Some(lsn) = at_lsn {
let res: Result<u64, _> = self.flush_frozen_layers(lsn);
if let Err(e) = res {
tracing::info!("failed to flush frozen layer after background freeze: {e:#}");
}
}
self.flush_frozen_layers();
}
}
@@ -2035,11 +2057,11 @@ impl Timeline {
true
} else if distance > 0 && opened_at.elapsed() >= self.get_checkpoint_timeout() {
info!(
"Will roll layer at {} with layer size {} due to time since first write to the layer ({:?})",
projected_lsn,
layer_size,
opened_at.elapsed()
);
"Will roll layer at {} with layer size {} due to time since first write to the layer ({:?})",
projected_lsn,
layer_size,
opened_at.elapsed()
);
true
} else {
@@ -2380,7 +2402,7 @@ impl Timeline {
let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
assert!(matches!(*flush_loop_state, FlushLoopState::Running{..}));
*flush_loop_state = FlushLoopState::Exited;
Ok(())
}
@@ -3643,28 +3665,21 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}
/// Whether there was a layer to freeze or not, return the value of get_last_record_lsn
/// before we attempted the freeze: this guarantees that ingested data is frozen up to this lsn (inclusive).
async fn freeze_inmem_layer(&self, write_lock_held: bool) -> Lsn {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let _write_guard = if write_lock_held {
None
} else {
Some(self.write_lock.lock().await)
async fn freeze_inmem_layer_at(
&self,
at: Lsn,
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
) {
let frozen = {
let mut guard = self.layers.write().await;
guard
.try_freeze_in_memory_layer(at, &self.last_freeze_at, write_lock)
.await
};
let to_lsn = self.get_last_record_lsn();
self.freeze_inmem_layer_at(to_lsn).await;
to_lsn
}
async fn freeze_inmem_layer_at(&self, at: Lsn) {
let mut guard = self.layers.write().await;
guard
.try_freeze_in_memory_layer(at, &self.last_freeze_at)
.await;
if frozen {
let now = Instant::now();
*(self.last_freeze_ts.write().unwrap()) = now;
}
}
/// Layer flusher task's main loop.
@@ -3758,18 +3773,14 @@ impl Timeline {
}
}
/// Request the flush loop to write out all frozen layers up to `to_lsn` as Delta L0 files to disk.
/// The caller is responsible for the freezing, e.g., [`Self::freeze_inmem_layer`].
/// Request the flush loop to write out all frozen layers up to `at_lsn` as Delta L0 files to disk.
/// The caller is responsible for the freezing, e.g., [`Self::freeze_inmem_layer_at`].
///
/// `last_record_lsn` may be higher than the highest LSN of a frozen layer: if this is the case,
/// it means no data will be written between the top of the highest frozen layer and to_lsn,
/// e.g. because this tenant shard has ingested up to to_lsn and not written any data locally for that part of the WAL.
async fn flush_frozen_layers_and_wait(
&self,
last_record_lsn: Lsn,
) -> Result<(), FlushLayerError> {
let mut rx = self.layer_flush_done_tx.subscribe();
/// `at_lsn` may be higher than the highest LSN of a frozen layer: if this is the
/// case, it means no data will be written between the top of the highest frozen layer and
/// to_lsn, e.g. because this tenant shard has ingested up to to_lsn and not written any data
/// locally for that part of the WAL.
fn flush_frozen_layers(&self, at_lsn: Lsn) -> Result<u64, FlushLayerError> {
// Increment the flush cycle counter and wake up the flush task.
// Remember the new value, so that when we listen for the flush
// to finish, we know when the flush that we initiated has
@@ -3784,13 +3795,18 @@ impl Timeline {
self.layer_flush_start_tx.send_modify(|(counter, lsn)| {
my_flush_request = *counter + 1;
*counter = my_flush_request;
*lsn = std::cmp::max(last_record_lsn, *lsn);
*lsn = std::cmp::max(at_lsn, *lsn);
});
Ok(my_flush_request)
}
async fn wait_flush_completion(&self, request: u64) -> Result<(), FlushLayerError> {
let mut rx = self.layer_flush_done_tx.subscribe();
loop {
{
let (last_result_counter, last_result) = &*rx.borrow();
if *last_result_counter >= my_flush_request {
if *last_result_counter >= request {
if let Err(err) = last_result {
// We already logged the original error in
// flush_loop. We cannot propagate it to the caller
@@ -3817,12 +3833,9 @@ impl Timeline {
}
}
fn flush_frozen_layers(&self) {
self.layer_flush_start_tx.send_modify(|(counter, lsn)| {
*counter += 1;
*lsn = std::cmp::max(*lsn, Lsn(self.last_freeze_at.load().0 - 1));
});
async fn flush_frozen_layers_and_wait(&self, at_lsn: Lsn) -> Result<(), FlushLayerError> {
let token = self.flush_frozen_layers(at_lsn)?;
self.wait_flush_completion(token).await
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
@@ -5540,6 +5553,9 @@ impl Timeline {
type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);
/// Tracking writes ingestion does to a particular in-memory layer.
///
/// Cleared upon freezing a layer.
struct TimelineWriterState {
open_layer: Arc<InMemoryLayer>,
current_size: u64,
@@ -5580,12 +5596,6 @@ impl Deref for TimelineWriter<'_> {
}
}
impl Drop for TimelineWriter<'_> {
fn drop(&mut self) {
self.write_guard.take();
}
}
#[derive(PartialEq)]
enum OpenLayerAction {
Roll,
@@ -5668,16 +5678,15 @@ impl<'a> TimelineWriter<'a> {
}
async fn roll_layer(&mut self, freeze_at: Lsn) -> anyhow::Result<()> {
assert!(self.write_guard.is_some());
self.tl.freeze_inmem_layer_at(freeze_at).await;
let now = Instant::now();
*(self.last_freeze_ts.write().unwrap()) = now;
self.tl.flush_frozen_layers();
let current_size = self.write_guard.as_ref().unwrap().current_size;
// self.write_guard will be taken by the freezing
self.tl
.freeze_inmem_layer_at(freeze_at, &mut self.write_guard)
.await;
self.tl.flush_frozen_layers(freeze_at)?;
if current_size >= self.get_checkpoint_distance() * 2 {
warn!("Flushed oversized open layer with size {}", current_size)
}
@@ -5691,9 +5700,27 @@ impl<'a> TimelineWriter<'a> {
return OpenLayerAction::Open;
};
#[cfg(feature = "testing")]
if state.cached_last_freeze_at < self.tl.last_freeze_at.load() {
// this check and assertion are not really needed because
// LayerManager::try_freeze_in_memory_layer will always clear out the
// TimelineWriterState if something is frozen. however, we can advance last_freeze_at when there
// is no TimelineWriterState.
assert!(
state.open_layer.end_lsn.get().is_some(),
"our open_layer must be outdated"
);
// this would be a memory leak waiting to happen because the in-memory layer always has
// an index
panic!("BUG: TimelineWriterState held on to frozen in-memory layer.");
}
if state.prev_lsn == Some(lsn) {
// Rolling mid LSN is not supported by downstream code.
// Rolling mid LSN is not supported by [downstream code].
// Hence, only roll at LSN boundaries.
//
// [downstream code]: https://github.com/neondatabase/neon/pull/7993#discussion_r1633345422
return OpenLayerAction::None;
}

View File

@@ -0,0 +1,90 @@
use std::{collections::BTreeSet, ops::Range};
use utils::lsn::Lsn;
use super::Timeline;
#[derive(serde::Serialize)]
pub(crate) struct RangeAnalysis {
start: String,
end: String,
has_image: bool,
num_of_deltas_above_image: usize,
total_num_of_deltas: usize,
}
impl Timeline {
pub(crate) async fn perf_info(&self) -> Vec<RangeAnalysis> {
// First, collect all split points of the layers.
let mut split_points = BTreeSet::new();
let mut delta_ranges = Vec::new();
let mut image_ranges = Vec::new();
let all_layer_files = {
let guard = self.layers.read().await;
guard.all_persistent_layers()
};
let lsn = self.get_last_record_lsn();
for key in all_layer_files {
split_points.insert(key.key_range.start);
split_points.insert(key.key_range.end);
if key.is_delta {
delta_ranges.push((key.key_range.clone(), key.lsn_range.clone()));
} else {
image_ranges.push((key.key_range.clone(), key.lsn_range.start));
}
}
// For each split range, compute the estimated read amplification.
let split_points = split_points.into_iter().collect::<Vec<_>>();
let mut result = Vec::new();
for i in 0..(split_points.len() - 1) {
let start = split_points[i];
let end = split_points[i + 1];
// Find the latest image layer that contains the information.
let mut maybe_image_layers = image_ranges
.iter()
// We insert split points for all image layers, and therefore a `contains` check for the start point should be enough.
.filter(|(key_range, img_lsn)| key_range.contains(&start) && img_lsn <= &lsn)
.cloned()
.collect::<Vec<_>>();
maybe_image_layers.sort_by(|a, b| a.1.cmp(&b.1));
let image_layer = maybe_image_layers.last().cloned();
let lsn_filter_start = image_layer
.as_ref()
.map(|(_, lsn)| *lsn)
.unwrap_or(Lsn::INVALID);
fn overlaps_with(lsn_range_a: &Range<Lsn>, lsn_range_b: &Range<Lsn>) -> bool {
!(lsn_range_a.end <= lsn_range_b.start || lsn_range_a.start >= lsn_range_b.end)
}
let maybe_delta_layers = delta_ranges
.iter()
.filter(|(key_range, lsn_range)| {
key_range.contains(&start) && overlaps_with(&(lsn_filter_start..lsn), lsn_range)
})
.cloned()
.collect::<Vec<_>>();
let pitr_delta_layers = delta_ranges
.iter()
.filter(|(key_range, _)| key_range.contains(&start))
.cloned()
.collect::<Vec<_>>();
result.push(RangeAnalysis {
start: start.to_string(),
end: end.to_string(),
has_image: image_layer.is_some(),
num_of_deltas_above_image: maybe_delta_layers.len(),
total_num_of_deltas: pitr_delta_layers.len(),
});
}
result
}
}

View File

@@ -1,4 +1,5 @@
use anyhow::{bail, ensure, Context, Result};
use itertools::Itertools;
use pageserver_api::shard::TenantShardId;
use std::{collections::HashMap, sync::Arc};
use tracing::trace;
@@ -20,6 +21,8 @@ use crate::{
},
};
use super::TimelineWriterState;
/// Provides semantic APIs to manipulate the layer map.
#[derive(Default)]
pub(crate) struct LayerManager {
@@ -119,18 +122,20 @@ impl LayerManager {
Ok(layer)
}
/// Called from `freeze_inmem_layer`, returns true if successfully frozen.
pub(crate) async fn try_freeze_in_memory_layer(
/// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
///
/// Returns true if anything was frozen.
pub(super) async fn try_freeze_in_memory_layer(
&mut self,
lsn: Lsn,
last_freeze_at: &AtomicLsn,
) {
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
) -> bool {
let Lsn(last_record_lsn) = lsn;
let end_lsn = Lsn(last_record_lsn + 1);
if let Some(open_layer) = &self.layer_map.open_layer {
let froze = if let Some(open_layer) = &self.layer_map.open_layer {
let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing?
open_layer.freeze(end_lsn).await;
// The layer is no longer open, update the layer map to reflect this.
@@ -138,11 +143,25 @@ impl LayerManager {
self.layer_map.frozen_layers.push_back(open_layer_rc);
self.layer_map.open_layer = None;
self.layer_map.next_open_layer_at = Some(end_lsn);
}
true
} else {
false
};
// Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
// accounts for regions in the LSN range where we might have ingested no data due to sharding.
last_freeze_at.store(end_lsn);
// the writer state must no longer have a reference to the frozen layer
let taken = write_lock.take();
assert_eq!(
froze,
taken.is_some(),
"should only had frozen a layer when TimelineWriterState existed"
);
froze
}
/// Add image layers to the layer map, called from `create_image_layers`.
@@ -308,6 +327,10 @@ impl LayerManager {
pub(crate) fn contains(&self, layer: &Layer) -> bool {
self.layer_fmgr.contains(layer)
}
pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
self.layer_fmgr.0.keys().cloned().collect_vec()
}
}
pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);

223
patches/pg_anon.patch Normal file
View File

@@ -0,0 +1,223 @@
commit 7dd414ee75f2875cffb1d6ba474df1f135a6fc6f
Author: Alexey Masterov <alexeymasterov@neon.tech>
Date: Fri May 31 06:34:26 2024 +0000
These alternative expected files were added to consider the neon features
diff --git a/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out b/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out
new file mode 100644
index 0000000..2539cfd
--- /dev/null
+++ b/ext-src/pg_anon-src/tests/expected/permissions_masked_role_1.out
@@ -0,0 +1,101 @@
+BEGIN;
+CREATE EXTENSION anon CASCADE;
+NOTICE: installing required extension "pgcrypto"
+SELECT anon.init();
+ init
+------
+ t
+(1 row)
+
+CREATE ROLE mallory_the_masked_user;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS 'MASKED';
+CREATE TABLE t1(i INT);
+ALTER TABLE t1 ADD COLUMN t TEXT;
+SECURITY LABEL FOR anon ON COLUMN t1.t
+IS 'MASKED WITH VALUE NULL';
+INSERT INTO t1 VALUES (1,'test');
+--
+-- We're checking the owner's permissions
+--
+-- see
+-- https://postgresql-anonymizer.readthedocs.io/en/latest/SECURITY/#permissions
+--
+SET ROLE mallory_the_masked_user;
+SELECT anon.pseudo_first_name(0) IS NOT NULL;
+ ?column?
+----------
+ t
+(1 row)
+
+-- SHOULD FAIL
+DO $$
+BEGIN
+ PERFORM anon.init();
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+-- SHOULD FAIL
+DO $$
+BEGIN
+ PERFORM anon.anonymize_table('t1');
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+-- SHOULD FAIL
+SAVEPOINT fail_start_engine;
+SELECT anon.start_dynamic_masking();
+ERROR: Only supersusers can start the dynamic masking engine.
+CONTEXT: PL/pgSQL function anon.start_dynamic_masking(boolean) line 18 at RAISE
+ROLLBACK TO fail_start_engine;
+RESET ROLE;
+SELECT anon.start_dynamic_masking();
+ start_dynamic_masking
+-----------------------
+ t
+(1 row)
+
+SET ROLE mallory_the_masked_user;
+SELECT * FROM mask.t1;
+ i | t
+---+---
+ 1 |
+(1 row)
+
+-- SHOULD FAIL
+DO $$
+BEGIN
+ SELECT * FROM public.t1;
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+-- SHOULD FAIL
+SAVEPOINT fail_stop_engine;
+SELECT anon.stop_dynamic_masking();
+ERROR: Only supersusers can stop the dynamic masking engine.
+CONTEXT: PL/pgSQL function anon.stop_dynamic_masking() line 18 at RAISE
+ROLLBACK TO fail_stop_engine;
+RESET ROLE;
+SELECT anon.stop_dynamic_masking();
+NOTICE: The previous priviledges of 'mallory_the_masked_user' are not restored. You need to grant them manually.
+ stop_dynamic_masking
+----------------------
+ t
+(1 row)
+
+SET ROLE mallory_the_masked_user;
+SELECT COUNT(*)=1 FROM anon.pg_masking_rules;
+ ?column?
+----------
+ t
+(1 row)
+
+-- SHOULD FAIL
+SAVEPOINT fail_seclabel_on_role;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS NULL;
+ERROR: permission denied
+DETAIL: The current user must have the CREATEROLE attribute.
+ROLLBACK TO fail_seclabel_on_role;
+ROLLBACK;
diff --git a/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out b/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out
new file mode 100644
index 0000000..8b090fe
--- /dev/null
+++ b/ext-src/pg_anon-src/tests/expected/permissions_owner_1.out
@@ -0,0 +1,104 @@
+BEGIN;
+CREATE EXTENSION anon CASCADE;
+NOTICE: installing required extension "pgcrypto"
+SELECT anon.init();
+ init
+------
+ t
+(1 row)
+
+CREATE ROLE oscar_the_owner;
+ALTER DATABASE :DBNAME OWNER TO oscar_the_owner;
+CREATE ROLE mallory_the_masked_user;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS 'MASKED';
+--
+-- We're checking the owner's permissions
+--
+-- see
+-- https://postgresql-anonymizer.readthedocs.io/en/latest/SECURITY/#permissions
+--
+SET ROLE oscar_the_owner;
+SELECT anon.pseudo_first_name(0) IS NOT NULL;
+ ?column?
+----------
+ t
+(1 row)
+
+-- SHOULD FAIL
+DO $$
+BEGIN
+ PERFORM anon.init();
+ EXCEPTION WHEN insufficient_privilege
+ THEN RAISE NOTICE 'insufficient_privilege';
+END$$;
+NOTICE: insufficient_privilege
+CREATE TABLE t1(i INT);
+ALTER TABLE t1 ADD COLUMN t TEXT;
+SECURITY LABEL FOR anon ON COLUMN t1.t
+IS 'MASKED WITH VALUE NULL';
+INSERT INTO t1 VALUES (1,'test');
+SELECT anon.anonymize_table('t1');
+ anonymize_table
+-----------------
+ t
+(1 row)
+
+SELECT * FROM t1;
+ i | t
+---+---
+ 1 |
+(1 row)
+
+UPDATE t1 SET t='test' WHERE i=1;
+-- SHOULD FAIL
+SAVEPOINT fail_start_engine;
+SELECT anon.start_dynamic_masking();
+ start_dynamic_masking
+-----------------------
+ t
+(1 row)
+
+ROLLBACK TO fail_start_engine;
+RESET ROLE;
+SELECT anon.start_dynamic_masking();
+ start_dynamic_masking
+-----------------------
+ t
+(1 row)
+
+SET ROLE oscar_the_owner;
+SELECT * FROM t1;
+ i | t
+---+------
+ 1 | test
+(1 row)
+
+--SELECT * FROM mask.t1;
+-- SHOULD FAIL
+SAVEPOINT fail_stop_engine;
+SELECT anon.stop_dynamic_masking();
+ERROR: permission denied for schema mask
+CONTEXT: SQL statement "DROP VIEW mask.t1;"
+PL/pgSQL function anon.mask_drop_view(oid) line 3 at EXECUTE
+SQL statement "SELECT anon.mask_drop_view(oid)
+ FROM pg_catalog.pg_class
+ WHERE relnamespace=quote_ident(pg_catalog.current_setting('anon.sourceschema'))::REGNAMESPACE
+ AND relkind IN ('r','p','f')"
+PL/pgSQL function anon.stop_dynamic_masking() line 22 at PERFORM
+ROLLBACK TO fail_stop_engine;
+RESET ROLE;
+SELECT anon.stop_dynamic_masking();
+NOTICE: The previous priviledges of 'mallory_the_masked_user' are not restored. You need to grant them manually.
+ stop_dynamic_masking
+----------------------
+ t
+(1 row)
+
+SET ROLE oscar_the_owner;
+-- SHOULD FAIL
+SAVEPOINT fail_seclabel_on_role;
+SECURITY LABEL FOR anon ON ROLE mallory_the_masked_user IS NULL;
+ERROR: permission denied
+DETAIL: The current user must have the CREATEROLE attribute.
+ROLLBACK TO fail_seclabel_on_role;
+ROLLBACK;

19
patches/pg_cron.patch Normal file
View File

@@ -0,0 +1,19 @@
commit b3ea51ee158f113f2f82d0b97c12c54343c9a695 (HEAD -> master)
Author: Alexey Masterov <alexeymasterov@neon.tech>
Date: Fri Jun 7 19:23:42 2024 +0000
Disable REGRESS_OPTIONS causing initdb
diff --git a/ext-src/pg_cron-src/Makefile b/ext-src/pg_cron-src/Makefile
index 053314c..fbd5fb5 100644
--- a/ext-src/pg_cron-src/Makefile
+++ b/ext-src/pg_cron-src/Makefile
@@ -5,7 +5,7 @@ EXTENSION = pg_cron
DATA_built = $(EXTENSION)--1.0.sql
DATA = $(wildcard $(EXTENSION)--*--*.sql)
-REGRESS_OPTS =--temp-config=./pg_cron.conf --temp-instance=./tmp_check
+#REGRESS_OPTS =--temp-config=./pg_cron.conf --temp-instance=./tmp_check
REGRESS = pg_cron-test
# compilation configuration

39
patches/pg_hintplan.patch Normal file
View File

@@ -0,0 +1,39 @@
commit f7925d4d1406c0f0229e3c691c94b69e381899b1 (HEAD -> master)
Author: Alexey Masterov <alexeymasterov@neon.tech>
Date: Thu Jun 6 08:02:42 2024 +0000
Patch expected files to consider Neon's log messages
diff --git a/ext-src/pg_hint_plan-src/expected/ut-A.out b/ext-src/pg_hint_plan-src/expected/ut-A.out
index da723b8..f8d0102 100644
--- a/ext-src/pg_hint_plan-src/expected/ut-A.out
+++ b/ext-src/pg_hint_plan-src/expected/ut-A.out
@@ -9,13 +9,16 @@ SET search_path TO public;
----
-- No.A-1-1-3
CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan
-- No.A-1-2-3
DROP EXTENSION pg_hint_plan;
-- No.A-1-1-4
CREATE SCHEMA other_schema;
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan
DROP SCHEMA other_schema;
----
---- No. A-5-1 comment pattern
diff --git a/ext-src/pg_hint_plan-src/expected/ut-fdw.out b/ext-src/pg_hint_plan-src/expected/ut-fdw.out
index d372459..6282afe 100644
--- a/ext-src/pg_hint_plan-src/expected/ut-fdw.out
+++ b/ext-src/pg_hint_plan-src/expected/ut-fdw.out
@@ -7,6 +7,7 @@ SET pg_hint_plan.debug_print TO on;
SET client_min_messages TO LOG;
SET pg_hint_plan.enable_hint TO on;
CREATE EXTENSION file_fdw;
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/file_fdw
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');

View File

@@ -40,6 +40,7 @@ tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
measured.workspace = true
scopeguard.workspace = true
strum.workspace = true
strum_macros.workspace = true

View File

@@ -0,0 +1,236 @@
use std::{borrow::Cow, collections::HashMap, fmt::Debug, fmt::Display, sync::Arc};
use tokio::{sync::mpsc::error::TrySendError, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use utils::id::NodeId;
use crate::service::Service;
pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10;
#[derive(Copy, Clone)]
pub(crate) struct Drain {
node_id: NodeId,
}
#[derive(Copy, Clone)]
pub(crate) struct Fill {
node_id: NodeId,
}
pub(crate) enum Operation {
Drain(Drain),
Fill(Fill),
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum OperationError {
#[error("Operation precondition failed: {0}")]
PreconditionFailed(Cow<'static, str>),
#[error("Node state changed during operation: {0}")]
NodeStateChanged(Cow<'static, str>),
#[error("Operation cancelled")]
Cancelled,
#[error("Shutting down")]
ShuttingDown,
}
struct OperationHandler {
operation: Operation,
#[allow(unused)]
cancel: CancellationToken,
// TODO(vlad): maybe we don't need this handle after all.
// Unless we want to log the error at the end.
#[allow(unused)]
handle: JoinHandle<Result<(), OperationError>>,
}
#[derive(Default, Clone)]
struct OngoingOperations(Arc<std::sync::RwLock<HashMap<NodeId, OperationHandler>>>);
pub(crate) struct Controller {
ongoing: OngoingOperations,
service: Arc<Service>,
sender: tokio::sync::mpsc::Sender<Operation>,
}
// TODO(vlad): write a little python test for this
impl Controller {
pub(crate) fn new(service: Arc<Service>) -> (Self, tokio::sync::mpsc::Receiver<Operation>) {
let (operations_tx, operations_rx) = tokio::sync::mpsc::channel(1);
(
Self {
ongoing: Default::default(),
service,
sender: operations_tx,
},
operations_rx,
)
}
pub(crate) fn drain_node(&self, node_id: NodeId) -> Result<(), OperationError> {
if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) {
return Err(OperationError::PreconditionFailed(
format!(
"Background operation already ongoing for node: {}",
handler.operation
)
.into(),
));
}
self.sender.try_send(Operation::Drain(Drain { node_id }))?;
Ok(())
}
pub(crate) fn fill_node(&self, node_id: NodeId) -> Result<(), OperationError> {
if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) {
return Err(OperationError::PreconditionFailed(
format!(
"Background operation already ongoing for node: {}",
handler.operation
)
.into(),
));
}
self.sender.try_send(Operation::Fill(Fill { node_id }))?;
Ok(())
}
pub(crate) async fn handle_operations(
&self,
mut receiver: tokio::sync::mpsc::Receiver<Operation>,
) {
while let Some(op) = receiver.recv().await {
match op {
Operation::Drain(drain) => self.handle_drain(drain),
Operation::Fill(fill) => self.handle_fill(fill),
}
}
}
fn handle_drain(&self, drain: Drain) {
let node_id = drain.node_id;
let cancel = CancellationToken::new();
let (_holder, waiter) = utils::completion::channel();
let handle = tokio::task::spawn({
let service = self.service.clone();
let ongoing = self.ongoing.clone();
let cancel = cancel.clone();
async move {
scopeguard::defer! {
let removed = ongoing.0.write().unwrap().remove(&drain.node_id);
if let Some(Operation::Drain(removed_drain)) = removed.map(|h| h.operation) {
assert_eq!(removed_drain.node_id, drain.node_id, "We always remove the same operation");
} else {
panic!("We always remove the same operation")
}
}
waiter.wait().await;
service.drain_node(drain.node_id, cancel).await
}
});
let replaced = self.ongoing.0.write().unwrap().insert(
node_id,
OperationHandler {
operation: Operation::Drain(drain),
cancel,
handle,
},
);
assert!(
replaced.is_none(),
"The channel size is 1 and we checked before enqueing"
);
}
fn handle_fill(&self, fill: Fill) {
let node_id = fill.node_id;
let cancel = CancellationToken::new();
let (_holder, waiter) = utils::completion::channel();
let handle = tokio::task::spawn({
let service = self.service.clone();
let ongoing = self.ongoing.clone();
let cancel = cancel.clone();
async move {
scopeguard::defer! {
let removed = ongoing.0.write().unwrap().remove(&fill.node_id);
if let Some(Operation::Fill(removed_fill)) = removed.map(|h| h.operation) {
assert_eq!(removed_fill.node_id, fill.node_id, "We always remove the same operation");
} else {
panic!("We always remove the same operation")
}
}
waiter.wait().await;
service.fill_node(fill.node_id, cancel).await
}
});
let replaced = self.ongoing.0.write().unwrap().insert(
node_id,
OperationHandler {
operation: Operation::Fill(fill),
cancel,
handle,
},
);
assert!(
replaced.is_none(),
"The channel size is 1 and we checked before enqueing"
);
}
}
impl<T> From<TrySendError<T>> for OperationError {
fn from(value: TrySendError<T>) -> Self {
match value {
TrySendError::Full(_) => {
Self::PreconditionFailed("Too many background operation in progress".into())
}
TrySendError::Closed(_) => Self::ShuttingDown,
}
}
}
impl Display for Drain {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "drain {}", self.node_id)
}
}
impl Display for Fill {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "fill {}", self.node_id)
}
}
impl Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Operation::Drain(op) => write!(f, "{op}"),
Operation::Fill(op) => write!(f, "{op}"),
}
}
}
impl Debug for Controller {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "backround_node_operations::Controller")
}
}

View File

@@ -480,6 +480,39 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
)
}
async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
let node_status = state.service.get_node_status(node_id).await?;
json_response(StatusCode::OK, node_status)
}
async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
state.service.start_node_drain(node_id).await?;
json_response(StatusCode::ACCEPTED, ())
}
async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
state.service.start_node_fill(node_id).await?;
json_response(StatusCode::ACCEPTED, ())
}
async fn handle_tenant_shard_split(
service: Arc<Service>,
mut req: Request<Body>,
@@ -832,6 +865,16 @@ pub fn make_router(
RequestName("control_v1_node_config"),
)
})
.get("/control/v1/node/:node_id", |r| {
named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
})
.put("/control/v1/node/:node_id/drain", |r| {
named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
})
.put("/control/v1/node/:node_id/fill", |r| {
named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
})
// TODO(vlad): endpoint for cancelling drain and fill
// Tenant Shard operations
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
tenant_service_handler(

View File

@@ -2,6 +2,7 @@ use serde::Serialize;
use utils::seqwait::MonotonicCounter;
mod auth;
mod background_node_operations;
mod compute_hook;
mod heartbeater;
pub mod http;

View File

@@ -59,12 +59,17 @@ impl Node {
self.id
}
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
self.scheduling
}
pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
self.scheduling = scheduling
}
/// Does this registration request match `self`? This is used when deciding whether a registration
/// request should be allowed to update an existing record with the same node ID.
/// how
pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
self.id == register_req.node_id
&& self.listen_http_addr == register_req.listen_http_addr
@@ -141,6 +146,7 @@ impl Node {
NodeSchedulingPolicy::Draining => MaySchedule::No,
NodeSchedulingPolicy::Filling => MaySchedule::Yes(score),
NodeSchedulingPolicy::Pause => MaySchedule::No,
NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
}
}
@@ -157,7 +163,7 @@ impl Node {
listen_http_port,
listen_pg_addr,
listen_pg_port,
scheduling: NodeSchedulingPolicy::Filling,
scheduling: NodeSchedulingPolicy::Active,
availability: NodeAvailability::Offline,
cancel: CancellationToken::new(),
}

View File

@@ -442,13 +442,15 @@ impl Persistence {
#[tracing::instrument(skip_all, fields(node_id))]
pub(crate) async fn re_attach(
&self,
node_id: NodeId,
input_node_id: NodeId,
) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
use crate::schema::nodes::dsl::scheduling_policy;
use crate::schema::nodes::dsl::*;
use crate::schema::tenant_shards::dsl::*;
let updated = self
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
let rows_updated = diesel::update(tenant_shards)
.filter(generation_pageserver.eq(node_id.0 as i64))
.filter(generation_pageserver.eq(input_node_id.0 as i64))
.set(generation.eq(generation + 1))
.execute(conn)?;
@@ -457,9 +459,23 @@ impl Persistence {
// TODO: UPDATE+SELECT in one query
let updated = tenant_shards
.filter(generation_pageserver.eq(node_id.0 as i64))
.filter(generation_pageserver.eq(input_node_id.0 as i64))
.select(TenantShardPersistence::as_select())
.load(conn)?;
// If the node went through a drain and restart phase before re-attaching,
// then reset it's node scheduling policy to active.
diesel::update(nodes)
.filter(node_id.eq(input_node_id.0 as i64))
.filter(
scheduling_policy
// TODO(vlad): Do the same for `Filling`
.eq(String::from(NodeSchedulingPolicy::PauseForRestart))
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining))),
)
.set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
.execute(conn)?;
Ok(updated)
})
.await?;

View File

@@ -29,6 +29,8 @@ pub enum MaySchedule {
struct SchedulerNode {
/// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
shard_count: usize,
/// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
attached_shard_count: usize,
/// Whether this node is currently elegible to have new shards scheduled (this is derived
/// from a node's availability state and scheduling policy).
@@ -42,7 +44,9 @@ impl PartialEq for SchedulerNode {
(MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
);
may_schedule_matches && self.shard_count == other.shard_count
may_schedule_matches
&& self.shard_count == other.shard_count
&& self.attached_shard_count == other.attached_shard_count
}
}
@@ -138,6 +142,15 @@ impl ScheduleContext {
}
}
pub(crate) enum RefCountUpdate {
PromoteSecondary,
Attach,
Detach,
DemoteAttached,
AddSecondary,
RemoveSecondary,
}
impl Scheduler {
pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
let mut scheduler_nodes = HashMap::new();
@@ -146,6 +159,7 @@ impl Scheduler {
node.get_id(),
SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
},
);
@@ -171,6 +185,7 @@ impl Scheduler {
node.get_id(),
SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
},
);
@@ -179,7 +194,10 @@ impl Scheduler {
for shard in shards {
if let Some(node_id) = shard.intent.get_attached() {
match expect_nodes.get_mut(node_id) {
Some(node) => node.shard_count += 1,
Some(node) => {
node.shard_count += 1;
node.attached_shard_count += 1;
}
None => anyhow::bail!(
"Tenant {} references nonexistent node {}",
shard.tenant_shard_id,
@@ -227,31 +245,67 @@ impl Scheduler {
Ok(())
}
/// Increment the reference count of a node. This reference count is used to guide scheduling
/// decisions, not for memory management: it represents one tenant shard whose IntentState targets
/// this node.
/// Update the reference counts of a node. These reference counts are used to guide scheduling
/// decisions, not for memory management: they represent the number of tenant shard whose IntentState
/// targets this node and the number of tenants shars whose IntentState is attached to this
/// node.
///
/// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
/// [`Self::new`] or [`Self::node_upsert`])
pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
let Some(node) = self.nodes.get_mut(&node_id) else {
tracing::error!("Scheduler missing node {node_id}");
debug_assert!(false);
tracing::error!("Scheduler missing node {node_id}");
return;
};
node.shard_count += 1;
match update {
RefCountUpdate::PromoteSecondary => {
node.attached_shard_count += 1;
}
RefCountUpdate::Attach => {
node.shard_count += 1;
node.attached_shard_count += 1;
}
RefCountUpdate::Detach => {
node.shard_count -= 1;
node.attached_shard_count -= 1;
}
RefCountUpdate::DemoteAttached => {
node.attached_shard_count -= 1;
}
RefCountUpdate::AddSecondary => {
node.shard_count += 1;
}
RefCountUpdate::RemoveSecondary => {
node.shard_count -= 1;
}
}
}
/// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`].
pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
let Some(node) = self.nodes.get_mut(&node_id) else {
// Check if the number of shards attached to a give node is lagging below
// the cluster average. If that's the case, the node should be filled.
// TODO(vlad): We can probably be smarter about this. This could be expressed
// as standard deviation of attached shards while excluding paused nodes.
pub(crate) fn should_fill_node(&self, node_id: NodeId) -> bool {
let Some(node) = self.nodes.get(&node_id) else {
debug_assert!(false);
tracing::error!("Scheduler missing node {node_id}");
return;
return true;
};
node.shard_count -= 1;
let total_attached_shards: usize =
self.nodes.values().map(|n| n.attached_shard_count).sum();
assert!(!self.nodes.is_empty());
let expected_attached_shards_per_node = total_attached_shards / self.nodes.len();
// TODO(vlad): remove this
for (node_id, node) in self.nodes.iter() {
tracing::info!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
}
node.attached_shard_count < expected_attached_shards_per_node
}
pub(crate) fn node_upsert(&mut self, node: &Node) {
@@ -263,6 +317,7 @@ impl Scheduler {
Vacant(entry) => {
entry.insert(SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
});
}

View File

@@ -8,13 +8,15 @@ use std::{
};
use crate::{
background_node_operations::{self, Controller, OperationError, MAX_RECONCILES_PER_OPERATION},
compute_hook::NotifyError,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{ScheduleContext, ScheduleMode},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
ScheduleOptimizationAction,
},
};
use anyhow::Context;
@@ -275,6 +277,8 @@ pub struct Service {
/// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity.
delayed_reconcile_tx: tokio::sync::mpsc::Sender<TenantShardId>,
background_operations_controller: std::sync::OnceLock<Controller>,
// Process shutdown will fire this token
cancel: CancellationToken,
@@ -296,6 +300,19 @@ impl From<ReconcileWaitError> for ApiError {
}
}
impl From<OperationError> for ApiError {
fn from(value: OperationError) -> Self {
match value {
OperationError::PreconditionFailed(err) => ApiError::PreconditionFailed(err.into()),
OperationError::NodeStateChanged(err) => {
ApiError::InternalServerError(anyhow::anyhow!(err))
}
OperationError::ShuttingDown => ApiError::ShuttingDown,
OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()),
}
}
}
#[allow(clippy::large_enum_variant)]
enum TenantCreateOrUpdate {
Create(TenantCreateRequest),
@@ -1090,12 +1107,20 @@ impl Service {
delayed_reconcile_tx,
abort_tx,
startup_complete: startup_complete.clone(),
background_operations_controller: Default::default(),
cancel,
gate: Gate::default(),
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
});
let (controller, node_operations_receiver) =
background_node_operations::Controller::new(this.clone());
this.background_operations_controller
.set(controller)
.expect("This is the only code path that sets the controller");
let result_task_this = this.clone();
tokio::task::spawn(async move {
// Block shutdown until we're done (we must respect self.cancel)
@@ -1167,6 +1192,19 @@ impl Service {
}
});
tokio::task::spawn({
let this = this.clone();
let startup_complete = startup_complete.clone();
async move {
startup_complete.wait().await;
this.background_operations_controller
.get()
.expect("Initialized at start up")
.handle_operations(node_operations_receiver)
.await;
}
});
Ok(this)
}
@@ -1562,15 +1600,30 @@ impl Service {
// Setting a node active unblocks any Reconcilers that might write to the location config API,
// but those requests will not be accepted by the node until it has finished processing
// the re-attach response.
//
// Additionally, reset the nodes scheduling policy to match the conditional update done
// in [`Persistence::re_attach`].
if let Some(node) = nodes.get(&reattach_req.node_id) {
if !node.is_available() {
let reset_scheduling = matches!(
node.get_scheduling(),
NodeSchedulingPolicy::PauseForRestart | NodeSchedulingPolicy::Draining
);
if !node.is_available() || reset_scheduling {
let mut new_nodes = (**nodes).clone();
if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) {
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
if !node.is_available() {
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
}
if reset_scheduling {
node.set_scheduling(NodeSchedulingPolicy::Active);
}
scheduler.node_upsert(node);
let new_nodes = Arc::new(new_nodes);
*nodes = new_nodes;
}
let new_nodes = Arc::new(new_nodes);
*nodes = new_nodes;
}
}
@@ -1851,6 +1904,23 @@ impl Service {
Ok(())
}
async fn kick_waiters(
&self,
waiters: Vec<ReconcilerWaiter>,
timeout: Duration,
) -> Vec<ReconcilerWaiter> {
let deadline = Instant::now().checked_add(timeout).unwrap();
for waiter in waiters.iter() {
let timeout = deadline.duration_since(Instant::now());
let _ = waiter.wait_timeout(timeout).await;
}
waiters
.into_iter()
.filter(|waiter| matches!(waiter.get_status(), ReconcilerStatus::InProgress))
.collect::<Vec<_>>()
}
/// Part of [`Self::tenant_location_config`]: dissect an incoming location config request,
/// and transform it into either a tenant creation of a series of shard updates.
///
@@ -4128,6 +4198,18 @@ impl Service {
Ok(nodes)
}
pub(crate) async fn get_node_status(&self, node_id: NodeId) -> Result<Node, ApiError> {
self.inner
.read()
.unwrap()
.nodes
.get(&node_id)
.cloned()
.ok_or(ApiError::NotFound(
format!("Node {node_id} not registered").into(),
))
}
pub(crate) async fn node_register(
&self,
register_req: NodeRegisterRequest,
@@ -4282,9 +4364,6 @@ impl Service {
if let Some(scheduling) = scheduling {
node.set_scheduling(scheduling);
// TODO: once we have a background scheduling ticker for fill/drain, kick it
// to wake up and start working.
}
// Update the scheduler, in case the elegibility of the node for new shards has changed
@@ -4312,7 +4391,7 @@ impl Service {
continue;
}
if tenant_shard.intent.demote_attached(node_id) {
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();
// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
@@ -4359,7 +4438,7 @@ impl Service {
// TODO: in the background, we should balance work back onto this pageserver
}
AvailabilityTransition::Unchanged => {
tracing::debug!("Node {} no change during config", node_id);
tracing::debug!("Node {} no availability change during config", node_id);
}
}
@@ -4368,6 +4447,112 @@ impl Service {
Ok(())
}
pub(crate) async fn start_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> {
let (node_available, node_policy, schedulable_nodes_count) = {
let locked = self.inner.read().unwrap();
let nodes = &locked.nodes;
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
))?;
let schedulable_nodes_count = nodes
.iter()
.filter(|(_, n)| matches!(n.may_schedule(), MaySchedule::Yes(_)))
.count();
(
node.is_available(),
node.get_scheduling(),
schedulable_nodes_count,
)
};
if !node_available {
return Err(ApiError::ResourceUnavailable(
format!("Node {node_id} is currently unavailable").into(),
));
}
if schedulable_nodes_count == 0 {
return Err(ApiError::PreconditionFailed(
"No other schedulable nodes to drain to".into(),
));
}
match node_policy {
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => {
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining))
.await?;
let controller = self
.background_operations_controller
.get()
.expect("Initialized at start up");
controller.drain_node(node_id)?;
}
NodeSchedulingPolicy::Draining => {
return Err(ApiError::Conflict(format!(
"Node {node_id} has drain in progress"
)));
}
policy => {
return Err(ApiError::PreconditionFailed(
format!("Node {node_id} cannot be drained due to {policy:?} policy").into(),
));
}
}
Ok(())
}
pub(crate) async fn start_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> {
let (node_available, node_policy, total_nodes_count) = {
let locked = self.inner.read().unwrap();
let nodes = &locked.nodes;
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
))?;
(node.is_available(), node.get_scheduling(), nodes.len())
};
if !node_available {
return Err(ApiError::ResourceUnavailable(
format!("Node {node_id} is currently unavailable").into(),
));
}
if total_nodes_count <= 1 {
return Err(ApiError::PreconditionFailed(
"No other nodes to fill from".into(),
));
}
match node_policy {
// TODO: clear Draining, Filling and PauseForRestart on storage controller restart and
// re-attach
NodeSchedulingPolicy::Active => {
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling))
.await?;
let controller = self
.background_operations_controller
.get()
.expect("Initialized at start up");
controller.fill_node(node_id)?;
}
NodeSchedulingPolicy::Filling => {
return Err(ApiError::Conflict(format!(
"Node {node_id} has fill in progress"
)));
}
policy => {
return Err(ApiError::PreconditionFailed(
format!("Node {node_id} cannot be filled due to {policy:?} policy").into(),
));
}
}
Ok(())
}
/// Helper for methods that will try and call pageserver APIs for
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
/// is attached somewhere.
@@ -4952,4 +5137,197 @@ impl Service {
// to complete.
self.gate.close().await;
}
pub(crate) async fn drain_node(
&self,
node_id: NodeId,
cancel: CancellationToken,
) -> Result<(), OperationError> {
tracing::info!(%node_id, "Starting drain background operation");
let last_inspected_shard: Option<TenantShardId> = None;
let mut inspected_all_shards = false;
let mut waiters = Vec::new();
let mut schedule_context = ScheduleContext::default();
while !inspected_all_shards {
if cancel.is_cancelled() {
return Err(OperationError::Cancelled);
}
{
let mut locked = self.inner.write().unwrap();
tracing::info!(%node_id, "Drain got write lock");
let (nodes, tenants, scheduler) = locked.parts_mut();
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
format!("node {node_id} was removed").into(),
))?;
let current_policy = node.get_scheduling();
if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
// about it
return Err(OperationError::NodeStateChanged(
format!("node {node_id} changed state to {current_policy:?}").into(),
));
}
let mut cursor =
tenants
.iter_mut()
.skip_while(|(tid, _)| match last_inspected_shard {
Some(last) => **tid != last,
None => false,
});
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
let (tid, tenant_shard) = match cursor.next() {
Some(some) => some,
None => {
inspected_all_shards = true;
break;
}
};
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();
match tenant_shard.schedule(scheduler, &mut schedule_context) {
Err(e) => {
tracing::warn!(%tid, "Scheduling error when draining pageserver {} : {e}", node_id);
}
Ok(()) => {
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
if let Some(some) = waiter {
waiters.push(some);
}
}
}
}
}
}
tracing::info!(%node_id, "Drain released write lock");
waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
}
let _ = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
// At this point we have done the best we could to drain shards from this node.
// Set the node scheduling policy to `[NodeSchedulingPolicy::PauseForRestart]`
// to complete the drain.
if let Err(err) = self
.node_configure(node_id, None, Some(NodeSchedulingPolicy::PauseForRestart))
.await
{
// This is not fatal. Anything that is polling the node scheduling policy to detect
// the end of the drain operations will hang, but all such places should enforce an
// overall timeout. The scheduling policy will be updated upon node re-attach and/or
// by the counterpart fill operation.
tracing::warn!(%node_id, "Failed to finalise drain by setting scheduling policy: {err}");
}
tracing::info!(%node_id, "Completed drain background operation");
Ok(())
}
// TODO: exit early when we are balanced
pub(crate) async fn fill_node(
&self,
node_id: NodeId,
cancel: CancellationToken,
) -> Result<(), OperationError> {
// TODO(vlad): Currently this operates on the assumption that all
// secondaries are warm. This is not always true (we just migrated the
// tenant). Take that into consideration by checking the secondary status.
// TODO(vlad): This is a subset of the optimizations applied by `Service::optimize_all`.
// The optimizations should be stopped while filing, othwewise we are going to fight for
// the same reconciler semaphore units.
tracing::info!(%node_id, "Starting fill background operation");
let last_inspected_shard: Option<TenantShardId> = None;
let mut shards_left = true;
let mut waiters = Vec::new();
let mut should_fill_node = {
self.inner
.write()
.unwrap()
.scheduler
.should_fill_node(node_id)
};
while shards_left && should_fill_node {
if cancel.is_cancelled() {
return Err(OperationError::Cancelled);
}
{
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
format!("node {node_id} was removed").into(),
))?;
let current_policy = node.get_scheduling();
if !matches!(current_policy, NodeSchedulingPolicy::Filling) {
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
// about it
return Err(OperationError::NodeStateChanged(
format!("node {node_id} changed state to {current_policy:?}").into(),
));
}
let mut cursor =
tenants
.iter_mut()
.skip_while(|(tid, _)| match last_inspected_shard {
Some(last) => **tid != last,
None => false,
});
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
let (_tid, tenant_shard) = match cursor.next() {
Some(some) => some,
None => {
shards_left = false;
break;
}
};
if tenant_shard.intent.get_secondary().contains(&node_id) {
tenant_shard.intent.promote_attached(scheduler, node_id);
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
if let Some(some) = waiter {
waiters.push(some);
}
}
}
should_fill_node = scheduler.should_fill_node(node_id);
}
waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
}
let _ = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
if let Err(err) = self
.node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
.await
{
// This isn't a huge issue since the filling process starts upon request. However, it
// will prevent the next drain from starting. The only case in which this can fail
// is database unavailability. Such a case will require manual intervention.
tracing::error!(%node_id, "Failed to finalise fill by setting scheduling policy: {err}");
}
tracing::info!(%node_id, "Completed fill background operation");
Ok(())
}
}

View File

@@ -8,9 +8,11 @@ use crate::{
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
persistence::TenantShardPersistence,
reconciler::ReconcileUnits,
scheduler::{AffinityScore, MaySchedule, ScheduleContext},
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
};
use pageserver_api::controller_api::{
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
};
use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId},
@@ -153,7 +155,7 @@ impl IntentState {
}
pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
if let Some(node_id) = node_id {
scheduler.node_inc_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
}
Self {
attached: node_id,
@@ -164,10 +166,10 @@ impl IntentState {
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
if self.attached != new_attached {
if let Some(old_attached) = self.attached.take() {
scheduler.node_dec_ref(old_attached);
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
}
if let Some(new_attached) = &new_attached {
scheduler.node_inc_ref(*new_attached);
scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
}
self.attached = new_attached;
}
@@ -177,22 +179,27 @@ impl IntentState {
/// secondary to attached while maintaining the scheduler's reference counts.
pub(crate) fn promote_attached(
&mut self,
_scheduler: &mut Scheduler,
scheduler: &mut Scheduler,
promote_secondary: NodeId,
) {
// If we call this with a node that isn't in secondary, it would cause incorrect
// scheduler reference counting, since we assume the node is already referenced as a secondary.
debug_assert!(self.secondary.contains(&promote_secondary));
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.secondary.retain(|n| n != &promote_secondary);
let demoted = self.attached;
self.attached = Some(promote_secondary);
scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
if let Some(demoted) = demoted {
scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
}
}
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
debug_assert!(!self.secondary.contains(&new_secondary));
scheduler.node_inc_ref(new_secondary);
scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
self.secondary.push(new_secondary);
}
@@ -200,27 +207,27 @@ impl IntentState {
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
let index = self.secondary.iter().position(|n| *n == node_id);
if let Some(index) = index {
scheduler.node_dec_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
self.secondary.remove(index);
}
}
pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
for secondary in self.secondary.drain(..) {
scheduler.node_dec_ref(secondary);
scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
}
}
/// Remove the last secondary node from the list of secondaries
pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
if let Some(node_id) = self.secondary.pop() {
scheduler.node_dec_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
}
}
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
if let Some(old_attached) = self.attached.take() {
scheduler.node_dec_ref(old_attached);
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
}
self.clear_secondary(scheduler);
@@ -251,12 +258,11 @@ impl IntentState {
/// forget the location on the offline node.
///
/// Returns true if a change was made
pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool {
pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.attached = None;
self.secondary.push(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
true
} else {
false
@@ -307,6 +313,12 @@ pub(crate) struct ReconcilerWaiter {
seq: Sequence,
}
pub(crate) enum ReconcilerStatus {
Done,
Failed,
InProgress,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum ReconcileWaitError {
#[error("Timeout waiting for shard {0}")]
@@ -369,6 +381,16 @@ impl ReconcilerWaiter {
Ok(())
}
pub(crate) fn get_status(&self) -> ReconcilerStatus {
if self.seq_wait.would_wait_for(self.seq).is_err() {
ReconcilerStatus::Done
} else if self.error_seq_wait.would_wait_for(self.seq).is_err() {
ReconcilerStatus::Failed
} else {
ReconcilerStatus::InProgress
}
}
}
/// Having spawned a reconciler task, the tenant shard's state will carry enough
@@ -593,7 +615,7 @@ impl TenantShard {
Secondary => {
if let Some(node_id) = self.intent.get_attached() {
// Populate secondary by demoting the attached node
self.intent.demote_attached(*node_id);
self.intent.demote_attached(scheduler, *node_id);
modified = true;
} else if self.intent.secondary.is_empty() {
// Populate secondary by scheduling a fresh node
@@ -648,13 +670,17 @@ impl TenantShard {
let mut scores = all_pageservers
.iter()
.flat_map(|node_id| {
if matches!(
nodes
.get(node_id)
.map(|n| n.may_schedule())
.unwrap_or(MaySchedule::No),
MaySchedule::No
let node = nodes.get(node_id);
if node.is_none() {
None
} else if matches!(
node.unwrap().get_scheduling(),
NodeSchedulingPolicy::Filling
) {
// If the node is currently filling, don't count it as a candidate to avoid,
// racing with the background fill.
None
} else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
None
} else {
let affinity_score = schedule_context.get_node_affinity(*node_id);
@@ -783,7 +809,7 @@ impl TenantShard {
old_attached_node_id,
new_attached_node_id,
}) => {
self.intent.demote_attached(old_attached_node_id);
self.intent.demote_attached(scheduler, old_attached_node_id);
self.intent
.promote_attached(scheduler, new_attached_node_id);
}
@@ -1321,7 +1347,9 @@ pub(crate) mod tests {
assert_ne!(attached_node_id, secondary_node_id);
// Notifying the attached node is offline should demote it to a secondary
let changed = tenant_shard.intent.demote_attached(attached_node_id);
let changed = tenant_shard
.intent
.demote_attached(&mut scheduler, attached_node_id);
assert!(changed);
assert!(tenant_shard.intent.attached.is_none());
assert_eq!(tenant_shard.intent.secondary.len(), 2);

View File

@@ -285,6 +285,21 @@ def test_foobar(neon_env_builder: NeonEnvBuilder):
...
```
The env includes a default tenant and timeline. Therefore, you do not need to create your own
tenant/timeline for testing.
```python
def test_foobar2(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start() # Start the environment
with env.endpoints.create_start("main") as endpoint:
# Start the compute endpoint
client = env.pageserver.http_client() # Get the pageserver client
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)
```
For more information about pytest fixtures, see https://docs.pytest.org/en/stable/fixture.html
At the end of a test, all the nodes in the environment are automatically stopped, so you

View File

@@ -2213,6 +2213,30 @@ class NeonStorageController(MetricsGetter, LogUtils):
headers=self.headers(TokenScope.ADMIN),
)
def node_drain(self, node_id):
log.info(f"node_drain({node_id})")
self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain",
headers=self.headers(TokenScope.ADMIN),
)
def node_fill(self, node_id):
log.info(f"node_fill({node_id})")
self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill",
headers=self.headers(TokenScope.ADMIN),
)
def node_status(self, node_id):
response = self.request(
"GET",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
def node_list(self):
response = self.request(
"GET",
@@ -3386,7 +3410,7 @@ def static_proxy(
yield proxy
class Endpoint(PgProtocol):
class Endpoint(PgProtocol, LogUtils):
"""An object representing a Postgres compute endpoint managed by the control plane."""
def __init__(
@@ -3452,6 +3476,7 @@ class Endpoint(PgProtocol):
)
path = Path("endpoints") / self.endpoint_id / "pgdata"
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
self.logfile = self.endpoint_path() / "compute.log"
config_lines = config_lines or []

View File

@@ -923,3 +923,18 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
)
self.verbose_error(res)
return res.json() # type: ignore
def perf_info(
self,
tenant_id: Union[TenantId, TenantShardId],
timeline_id: TimelineId,
):
self.is_testing_enabled_or_skip()
log.info(f"Requesting perf info: tenant {tenant_id}, timeline {timeline_id}")
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/perf_info",
)
log.info(f"Got perf info response code: {res.status_code}")
self.verbose_error(res)
return res.json()

View File

@@ -313,7 +313,7 @@ def assert_prefix_empty(
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-5322/6207777020/index.html#suites/3556ed71f2d69272a7014df6dcb02317/53b5c368b5a68865
# this seems like a mock_s3 issue
log.warning(
f"contrading ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}, assuming this means KeyCount=0"
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}, assuming this means KeyCount=0"
)
keys = 0
elif keys != 0 and len(objects) == 0:

View File

@@ -582,3 +582,20 @@ class PropagatingThread(threading.Thread):
if self.exc:
raise self.exc
return self.ret
def human_bytes(amt: float) -> str:
"""
Render a bytes amount into nice IEC bytes string.
"""
suffixes = ["", "Ki", "Mi", "Gi"]
last = suffixes[-1]
for name in suffixes:
if amt < 1024 or name == last:
return f"{int(round(amt))} {name}B"
amt = amt / 1024
raise RuntimeError("unreachable")

View File

@@ -75,12 +75,29 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"]
log.info(f"Physical storage size {physical_size}")
max_num_of_deltas_above_image = 0
max_total_num_of_deltas = 0
for key_range in client.perf_info(tenant_id, timeline_id):
max_total_num_of_deltas = max(max_total_num_of_deltas, key_range["total_num_of_deltas"])
max_num_of_deltas_above_image = max(
max_num_of_deltas_above_image, key_range["num_of_deltas_above_image"]
)
MB = 1024 * 1024
zenbenchmark.record("logical_size", logical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("physical_size", physical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record(
"physical/logical ratio", physical_size / logical_size, "", MetricReport.LOWER_IS_BETTER
)
zenbenchmark.record(
"max_total_num_of_deltas", max_total_num_of_deltas, "", MetricReport.LOWER_IS_BETTER
)
zenbenchmark.record(
"max_num_of_deltas_above_image",
max_num_of_deltas_above_image,
"",
MetricReport.LOWER_IS_BETTER,
)
layer_map_path = env.repo_dir / "layer-map.json"
log.info(f"Writing layer map to {layer_map_path}")

View File

@@ -17,7 +17,7 @@ from fixtures.neon_fixtures import (
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import wait_until
from fixtures.utils import human_bytes, wait_until
GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"
@@ -218,19 +218,6 @@ def count_layers_per_tenant(
return dict(ret)
def human_bytes(amt: float) -> str:
suffixes = ["", "Ki", "Mi", "Gi"]
last = suffixes[-1]
for name in suffixes:
if amt < 1024 or name == last:
return f"{int(round(amt))} {name}B"
amt = amt / 1024
raise RuntimeError("unreachable")
def _eviction_env(
request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, num_pageservers: int
) -> EvictionEnv:
@@ -294,7 +281,7 @@ def pgbench_init_tenant(
"gc_period": "0s",
"compaction_period": "0s",
"checkpoint_distance": f"{layer_size}",
"image_creation_threshold": "100",
"image_creation_threshold": "999999",
"compaction_target_size": f"{layer_size}",
}
)
@@ -668,11 +655,10 @@ def test_fast_growing_tenant(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, or
finish_tenant_creation(env, tenant_id, timeline_id, min_expected_layers)
tenant_layers = count_layers_per_tenant(env.pageserver, map(lambda x: x[0], timelines))
(total_on_disk, _, _) = poor_mans_du(env, map(lambda x: x[0], timelines), env.pageserver, False)
(total_on_disk, _, _) = poor_mans_du(env, map(lambda x: x[0], timelines), env.pageserver, True)
# cut 10 percent
response = env.pageserver.http_client().disk_usage_eviction_run(
{"evict_bytes": total_on_disk // 10, "eviction_order": order.config()}
{"evict_bytes": total_on_disk // 5, "eviction_order": order.config()}
)
log.info(f"{response}")

View File

@@ -1,6 +1,5 @@
import asyncio
import os
import re
import threading
import time
from functools import partial
@@ -18,20 +17,6 @@ from fixtures.neon_fixtures import (
from fixtures.utils import wait_until
# Check for corrupted WAL messages which might otherwise go unnoticed if
# reconnection fixes this.
def scan_standby_log_for_errors(secondary):
log_path = secondary.endpoint_path() / "compute.log"
with log_path.open("r") as f:
markers = re.compile(
r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
)
for line in f:
if markers.search(line):
log.info(f"bad error in standby log: {line}")
raise AssertionError()
def test_hot_standby(neon_simple_env: NeonEnv):
env = neon_simple_env
@@ -91,7 +76,11 @@ def test_hot_standby(neon_simple_env: NeonEnv):
assert response is not None
assert response == responses[query]
scan_standby_log_for_errors(secondary)
# Check for corrupted WAL messages which might otherwise go unnoticed if
# reconnection fixes this.
assert not secondary.log_contains(
"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
)
# clean up
if slow_down_send:

View File

@@ -0,0 +1,151 @@
from dataclasses import dataclass
from typing import Iterable, List, Union
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_flush_lsn
from fixtures.pageserver.http import HistoricLayerInfo, LayerMapInfo
from fixtures.utils import human_bytes
def test_ingesting_large_batches_of_images(neon_env_builder: NeonEnvBuilder, build_type: str):
"""
Build a non-small GIN index which includes similarly batched up images in WAL stream as does pgvector
to show that we no longer create oversized layers.
"""
if build_type == "debug":
pytest.skip("debug run is unnecessarily slow")
minimum_initdb_size = 20 * 1024**2
checkpoint_distance = 32 * 1024**2
minimum_good_layer_size = checkpoint_distance * 0.9
minimum_too_large_layer_size = 2 * checkpoint_distance
# index size: 99MiB
rows = 2_500_000
# bucket lower limits
buckets = [0, minimum_initdb_size, minimum_good_layer_size, minimum_too_large_layer_size]
assert (
minimum_initdb_size < minimum_good_layer_size
), "keep checkpoint_distance higher than the initdb size (find it by experimenting)"
env = neon_env_builder.init_start(
initial_tenant_conf={
"checkpoint_distance": f"{checkpoint_distance}",
"compaction_target_size": f"{checkpoint_distance}",
# this test is primarly interested in L0 sizes but we'll compact after ingestion to ensure sizes are good even then
"compaction_period": "0s",
"gc_period": "0s",
"compaction_threshold": "255",
"image_creation_threshold": "99999",
}
)
# build a larger than 3*checkpoint_distance sized gin index.
# gin index building exhibits the same behaviour as the pgvector with the two phase build
with env.endpoints.create_start("main") as ep, ep.cursor() as cur:
cur.execute(
f"create table int_array_test as select array_agg(g) as int_array from generate_series(1, {rows}) g group by g / 10;"
)
cur.execute(
"create index int_array_test_gin_index on int_array_test using gin (int_array);"
)
cur.execute("select pg_table_size('int_array_test_gin_index')")
size = cur.fetchone()
assert size is not None
assert isinstance(size[0], int)
log.info(f"gin index size: {human_bytes(size[0])}")
assert (
size[0] > checkpoint_distance * 3
), f"gin index is not large enough: {human_bytes(size[0])}"
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
ps_http = env.pageserver.http_client()
ps_http.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
infos = ps_http.layer_map_info(env.initial_tenant, env.initial_timeline)
assert len(infos.in_memory_layers) == 0, "should had flushed open layers"
post_ingest = histogram_historic_layers(infos, buckets)
# describe first, assert later for easier debugging
log.info("non-cumulative layer size distribution after ingestion:")
print_layer_size_histogram(post_ingest)
# since all we have are L0s, we should be getting nice L1s and images out of them now
ps_http.patch_tenant_config_client_side(
env.initial_tenant,
{
"compaction_threshold": 1,
"image_creation_threshold": 1,
},
)
ps_http.timeline_compact(env.initial_tenant, env.initial_timeline, True, True)
infos = ps_http.layer_map_info(env.initial_tenant, env.initial_timeline)
assert len(infos.in_memory_layers) == 0, "no new inmem layers expected"
post_compact = histogram_historic_layers(infos, buckets)
log.info("non-cumulative layer size distribution after compaction:")
print_layer_size_histogram(post_compact)
assert (
post_ingest.counts[3] == 0
), f"there should be no layers larger than 2*checkpoint_distance ({human_bytes(2*checkpoint_distance)})"
assert post_ingest.counts[1] == 1, "expect one smaller layer for initdb"
assert (
post_ingest.counts[0] <= 1
), "expect at most one tiny layer from shutting down the endpoint"
# just make sure we don't have trouble splitting the layers apart
assert post_compact.counts[3] == 0
@dataclass
class Histogram:
buckets: List[Union[int, float]]
counts: List[int]
sums: List[int]
def histogram_historic_layers(
infos: LayerMapInfo, minimum_sizes: List[Union[int, float]]
) -> Histogram:
def log_layer(layer: HistoricLayerInfo) -> HistoricLayerInfo:
log.info(
f"{layer.layer_file_name} {human_bytes(layer.layer_file_size)} ({layer.layer_file_size} bytes)"
)
return layer
layers = map(log_layer, infos.historic_layers)
sizes = (x.layer_file_size for x in layers)
return histogram(sizes, minimum_sizes)
def histogram(sizes: Iterable[int], minimum_sizes: List[Union[int, float]]) -> Histogram:
assert all(minimum_sizes[i] < minimum_sizes[i + 1] for i in range(len(minimum_sizes) - 1))
buckets = list(enumerate(minimum_sizes))
counts = [0 for _ in buckets]
sums = [0 for _ in buckets]
for size in sizes:
found = False
for index, min_size in reversed(buckets):
if size >= min_size:
counts[index] += 1
sums[index] += size
found = True
break
assert found
return Histogram(minimum_sizes, counts, sums)
def print_layer_size_histogram(h: Histogram):
for index, min_size in enumerate(h.buckets):
log.info(
f">= {human_bytes(min_size)}: {h.counts[index]} layers total {human_bytes(h.sums[index])}"
)

View File

@@ -8,8 +8,6 @@ def test_migrations(neon_simple_env: NeonEnv):
env.neon_cli.create_branch("test_migrations", "empty")
endpoint = env.endpoints.create("test_migrations")
log_path = endpoint.endpoint_path() / "compute.log"
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
@@ -22,9 +20,7 @@ def test_migrations(neon_simple_env: NeonEnv):
migration_id = cur.fetchall()
assert migration_id[0][0] == num_migrations
with open(log_path, "r") as log_file:
logs = log_file.read()
assert f"INFO handle_migrations: Ran {num_migrations} migrations" in logs
endpoint.assert_log_contains(f"INFO handle_migrations: Ran {num_migrations} migrations")
endpoint.stop()
endpoint.start()
@@ -36,6 +32,4 @@ def test_migrations(neon_simple_env: NeonEnv):
migration_id = cur.fetchall()
assert migration_id[0][0] == num_migrations
with open(log_path, "r") as log_file:
logs = log_file.read()
assert "INFO handle_migrations: Ran 0 migrations" in logs
endpoint.assert_log_contains("INFO handle_migrations: Ran 0 migrations")

View File

@@ -563,6 +563,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
)
),
)
workload.stop()
def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):

View File

@@ -2,6 +2,7 @@ import time
from datetime import datetime, timezone
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
@@ -32,7 +33,12 @@ def test_tenant_s3_restore(
assert remote_storage, "remote storage not configured"
enable_remote_storage_versioning(remote_storage)
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
# change it back after initdb, recovery doesn't work if the two
# index_part.json uploads happen at same second or too close to each other.
initial_tenant_conf = MANY_SMALL_LAYERS_TENANT_CONFIG
del initial_tenant_conf["checkpoint_distance"]
env = neon_env_builder.init_start(initial_tenant_conf)
env.pageserver.allowed_errors.extend(
[
# The deletion queue will complain when it encounters simulated S3 errors
@@ -43,14 +49,16 @@ def test_tenant_s3_restore(
)
ps_http = env.pageserver.http_client()
tenant_id = env.initial_tenant
# now lets create the small layers
ps_http.set_tenant_config(tenant_id, MANY_SMALL_LAYERS_TENANT_CONFIG)
# Default tenant and the one we created
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1
# create two timelines one being the parent of another, both with non-trivial data
parent = None
parent = "main"
last_flush_lsns = []
for timeline in ["first", "second"]:
@@ -64,6 +72,7 @@ def test_tenant_s3_restore(
last_flush_lsns.append(last_flush_lsn)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
log.info(f"{timeline} timeline {timeline_id} {last_flush_lsn=}")
parent = timeline
# These sleeps are important because they fend off differences in clocks between us and S3
@@ -108,6 +117,9 @@ def test_tenant_s3_restore(
ps_http.tenant_attach(tenant_id, generation=generation)
env.pageserver.quiesce_tenants()
for tline in ps_http.timeline_list(env.initial_tenant):
log.info(f"timeline detail: {tline}")
for i, timeline in enumerate(["first", "second"]):
with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint:
endpoint.safe_psql(f"SELECT * FROM created_{timeline};")

View File

@@ -697,6 +697,9 @@ def test_sharding_ingest_layer_sizes(
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{expect_layer_size}",
"compaction_target_size": f"{expect_layer_size}",
# aim to reduce flakyness, we are not doing explicit checkpointing
"compaction_period": "0s",
"gc_period": "0s",
}
shard_count = 4
neon_env_builder.num_pageservers = shard_count
@@ -712,6 +715,23 @@ def test_sharding_ingest_layer_sizes(
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# ignore the initdb layer(s) for the purposes of the size comparison as a initdb image layer optimization
# will produce a lot more smaller layers.
initial_layers_per_shard = {}
log.info("initdb distribution (not asserted on):")
for shard in env.storage_controller.locate(tenant_id):
pageserver = env.get_pageserver(shard["node_id"])
shard_id = shard["shard_id"]
layers = (
env.get_pageserver(shard["node_id"]).http_client().layer_map_info(shard_id, timeline_id)
)
for layer in layers.historic_layers:
log.info(
f"layer[{pageserver.id}]: {layer.layer_file_name} (size {layer.layer_file_size})"
)
initial_layers_per_shard[shard_id] = set(layers.historic_layers)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(4096, upload=False)
@@ -733,7 +753,13 @@ def test_sharding_ingest_layer_sizes(
historic_layers = sorted(layer_map.historic_layers, key=lambda layer: layer.lsn_start)
initial_layers = initial_layers_per_shard[shard_id]
for layer in historic_layers:
if layer in initial_layers:
# ignore the initdb image layers for the size histogram
continue
if layer.layer_file_size < expect_layer_size // 2:
classification = "Small"
small_layer_count += 1
@@ -763,7 +789,8 @@ def test_sharding_ingest_layer_sizes(
pass
else:
# General case:
assert float(small_layer_count) / float(ok_layer_count) < 0.25
# old limit was 0.25 but pg14 is right at the limit with 7/28
assert float(small_layer_count) / float(ok_layer_count) < 0.3
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
assert huge_layer_count <= shard_count

View File

@@ -1477,3 +1477,110 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
workload = Workload(env, tenant_id, timeline, branch_name=branch)
workload.expect_rows = expect_rows
workload.validate()
def test_storage_controller_drain_and_fill_smoke(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
tenant_count = 5
shard_count_per_tenant = 8
total_shards = tenant_count * shard_count_per_tenant
tenant_ids = []
for _ in range(0, tenant_count):
tid = TenantId.generate()
tenant_ids.append(tid)
env.neon_cli.create_tenant(
tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant
)
# Give things a chance to settle.
# TODO(vlad): replace with a wait
time.sleep(2)
nodes = env.storage_controller.node_list()
assert len(nodes) == 2
def retryable_node_operation(op, ps_id, max_attempts, backoff):
while max_attempts > 0:
try:
op(ps_id)
return
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def poll_node_status(node_id, desired_scheduling_policy, max_attempts, backoff):
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
while max_attempts > 0:
try:
status = env.storage_controller.node_status(node_id)
policy = status["scheduling"]
if policy == desired_scheduling_policy:
return
else:
max_attempts -= 1
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
if max_attempts == 0:
raise AssertionError(
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
)
time.sleep(backoff)
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Status call failed ({max_attempts} retries left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards):
# Assert that all nodes have some attached shards
assert len(shard_counts) == len(env.pageservers)
min_shard_count = min(shard_counts.values())
max_shard_count = max(shard_counts.values())
flake_factor = 5 / 100
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
# Perform a graceful rolling restart
for ps in env.pageservers:
retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
)
poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
# Assert that we've drained the node
assert shard_counts[str(ps.id)] == 0
# Assert that those shards actually went somewhere
assert sum(shard_counts.values()) == total_shards
ps.restart()
poll_node_status(ps.id, "Active", max_attempts=10, backoff=1)
retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
poll_node_status(ps.id, "Active", max_attempts=6, backoff=5)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
assert_shard_counts_balanced(env, shard_counts, total_shards)
# Now check that shards are reasonably balanced
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after rolling restart: {shard_counts}")
assert_shard_counts_balanced(env, shard_counts, total_shards)