Compare commits

..

26 Commits

Author SHA1 Message Date
Christian Schwarz
1ebe92bcf9 make Tenant::timelines a tokio::sync::RwLock
This is preliminary work for/from #4220 (async `Layer::get_value_reconstruct_data`).

The patch converts `Tenant::timelines` from `std::sync::Mutex` to `tokio::sync::Mutex`.

We need this change because we want to switch `Timeline::layers` to an async RwLock.
We need that because we hold `Timeline::layers` while calling `Layer::get_value_reconstruct_data`.
So, if we want to make get_value_reconstruct_data async, we need to make `Timeline::layers` async first.
2023-05-24 17:53:12 +02:00
Christian Schwarz
413598b19b fix merge fallout (?) 2023-05-24 17:42:51 +02:00
Christian Schwarz
b345f32e3f Merge branch 'problame/infallible-timeline-activate/4-make-infallible' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-24 17:25:35 +02:00
Christian Schwarz
69cfa9fe61 launch_wal_receiver: apply joonas's review suggestion (visibility + doc comment) 2023-05-24 17:20:03 +02:00
Christian Schwarz
2c424c8f4e Revert "activate_timelines counter is now == not_broken_timelines.len()"
not_broken_timelines is an iterator, doesn't have `len()`.

This reverts commit 4001f441c0.
2023-05-24 17:19:22 +02:00
Christian Schwarz
4001f441c0 activate_timelines counter is now == not_broken_timelines.len() 2023-05-24 17:14:49 +02:00
Christian Schwarz
ef956c47fc make it clear that walreceiver_status is always used in the branch where it's produced 2023-05-24 17:12:35 +02:00
Christian Schwarz
8606b6abe5 Merge remote-tracking branch 'origin/problame/infallible-timeline-activate/3-funnel-storage-broker-client' into problame/infallible-timeline-activate/4-make-infallible 2023-05-24 17:02:18 +02:00
Christian Schwarz
732f60317b Merge remote-tracking branch 'origin/main' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-24 16:58:25 +02:00
Christian Schwarz
b54431bbd3 pass the BrokerClientChannel by value & clone it as necessary
It's a wrapper around an inner Arc anyways

Also, this gets rid of the OnceCell
2023-05-24 12:29:05 +02:00
Christian Schwarz
def5eb8542 Merge branch 'problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-24 11:57:37 +02:00
Christian Schwarz
07da786ed3 apply joonas's suggestion to use parent: None + follows_from 2023-05-24 11:56:26 +02:00
Christian Schwarz
75c3c43b2e don't unwrap() the activate() result in spawn_load / spawn_attach 2023-05-24 11:36:07 +02:00
Christian Schwarz
bdf03eab58 Merge branch 'problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-24 11:32:38 +02:00
Christian Schwarz
32c85fa87a Merge remote-tracking branch 'origin/main' into problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation 2023-05-24 11:31:00 +02:00
Christian Schwarz
b2e0c58a8c Merge branch 'problame/infallible-timeline-activate/4-make-infallible' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-23 20:44:34 +02:00
Christian Schwarz
94f30f0660 Merge branch 'problame/infallible-timeline-activate/3-funnel-storage-broker-client' into problame/infallible-timeline-activate/4-make-infallible 2023-05-23 20:44:12 +02:00
Christian Schwarz
a55d224923 tests would fail because broker client needs to be launched on a tokio runtime thread 2023-05-23 20:43:10 +02:00
Christian Schwarz
4f586ac101 Merge branch 'problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-23 20:42:54 +02:00
Christian Schwarz
feb2e80b83 tests were failing because activate() was outside of a span with tenant_id 2023-05-23 20:36:32 +02:00
Christian Schwarz
ee22e81583 don't hold timelines lock inside set_stopping() 2023-05-23 20:11:15 +02:00
Christian Schwarz
3e604eaa39 refactor: introduce TenantState::Activating to avoid holding timelines lock inside Tenant::activate 2023-05-23 20:03:12 +02:00
Christian Schwarz
8bcb542a3b refactor: make timeline activation infallible
Timeline::activate() was only fallible because `launch_wal_receiver` was.

`launch_wal_receiver` was fallible only because of some preliminary
checks in `WalReceiver::start`.

Turns out these checks can be shifted to the type system by delaying
creatinon of the `WalReceiver` struct to the point where we activate the timeline.

The changes in this PR were enabled by my previous refactoring that funneled
the broker_client from pageserver startup to the activate() call sites.
2023-05-23 19:27:06 +02:00
Christian Schwarz
17b081d294 refactor: eliminate global storage_broker client state
(This is prep work to make `Timeline::activate` infallible.)

This patch removes the global storage_broker client instance from the
pageserver codebase.

Instead, pageserver startup instantiates it and passes it down to
the `Timeline::activate` function, which in turn passes it to
the WalReceiver, which is the entity that actually uses it.
2023-05-23 19:27:06 +02:00
Christian Schwarz
d5337e6a65 refactor responsibility for tenant/timeline activation
(This is prep work to make `Timeline::activate()` infallible.)

The current possibility for failure in `Timeline::activate()`
is the broker client's presence / absence. It should be an assert, but
we're careful with these. So, I'm planning to pass in the broker
client to activate(), thereby eliminating the possiblity of its absence.

In the unit tests, we don't have a broker client. So, I thought I'd be
in trouble because the unit tests also called `activate()` before this
PR.

However, closer inspection reveals a long-standing FIXME about this,
which is addressed by this patch.

It turns out that the unit tests don't actually need the background
loops to be running. They just need the state value to be `Active`.
So, for the tests, we just set it to that value but don't spawn
the background loops.

We'll need to revisit this if we ever do more Rust unit tests in the
future. But right now, this refactoring improves the code, so, let's
revisit when we get there.
2023-05-23 19:26:36 +02:00
Christian Schwarz
cc96a5186d tenant_map_insert: don't expose the vacant entry to the closure
This tightens up the API a little.
Byproduct of some refactoring work that I'm doing right now.
2023-05-23 19:25:47 +02:00
72 changed files with 1432 additions and 3181 deletions

View File

@@ -36,6 +36,14 @@ inputs:
description: 'Region name for real s3 tests'
required: false
default: ''
real_s3_access_key_id:
description: 'Access key id'
required: false
default: ''
real_s3_secret_access_key:
description: 'Secret access key'
required: false
default: ''
rerun_flaky:
description: 'Whether to rerun flaky tests'
required: false
@@ -96,6 +104,8 @@ runs:
COMPATIBILITY_POSTGRES_DISTRIB_DIR: /tmp/neon-previous/pg_install
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: ${{ inputs.build_type }}
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg${{ inputs.pg_version }}
ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage')
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')

View File

@@ -346,8 +346,10 @@ jobs:
test_selection: regress
needs_postgres_source: true
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
real_s3_bucket: ci-tests-s3
real_s3_region: us-west-2
real_s3_access_key_id: "${{ secrets.AWS_ACCESS_KEY_ID_CI_TESTS_S3 }}"
real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}"
rerun_flaky: true
pg_version: ${{ matrix.pg_version }}
env:
@@ -407,7 +409,9 @@ jobs:
uses: ./.github/actions/allure-report-generate
- uses: actions/github-script@v6
if: ${{ !cancelled() }}
if: >
!cancelled() &&
github.event_name == 'pull_request'
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
@@ -417,7 +421,7 @@ jobs:
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
}
const script = require("./scripts/comment-test-report.js")
const script = require("./scripts/pr-comment-test-report.js")
await script({
github,
context,
@@ -492,24 +496,19 @@ jobs:
env:
COMMIT_URL: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.event.pull_request.head.sha || github.sha }}
run: |
scripts/coverage --dir=/tmp/coverage \
report \
scripts/coverage \
--dir=/tmp/coverage report \
--input-objects=/tmp/coverage/binaries.list \
--commit-url=${COMMIT_URL} \
--format=github
scripts/coverage --dir=/tmp/coverage \
report \
--input-objects=/tmp/coverage/binaries.list \
--format=lcov
- name: Upload coverage report
id: upload-coverage-report
env:
BUCKET: neon-github-public-dev
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://${BUCKET}/code-coverage/${COMMIT_SHA}
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://neon-github-public-dev/code-coverage/${COMMIT_SHA}
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/code-coverage/${COMMIT_SHA}/index.html
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
@@ -664,9 +663,6 @@ jobs:
project: nrdv0s4kcs
push: true
tags: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:depot-${{needs.tag.outputs.build-tag}}
build-args: |
GIT_VERSION=${{ github.sha }}
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
compute-tools-image:
runs-on: [ self-hosted, gen3, large ]
@@ -781,7 +777,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.8.0
VM_BUILDER_VERSION: v0.7.3-alpha3
steps:
- name: Checkout
@@ -802,7 +798,7 @@ jobs:
- name: Build vm image
run: |
./vm-builder -enable-file-cache -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
./vm-builder -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
- name: Pushing vm-compute-node image
run: |

629
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,6 @@ members = [
"compute_tools",
"control_plane",
"pageserver",
"pageserver/ctl",
"proxy",
"safekeeper",
"storage_broker",
@@ -23,7 +22,7 @@ async-stream = "0.3"
async-trait = "0.1"
atty = "0.2.14"
aws-config = { version = "0.55", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.27"
aws-sdk-s3 = "0.25"
aws-smithy-http = "0.55"
aws-credential-types = "0.55"
aws-types = "0.55"

View File

@@ -47,7 +47,8 @@ RUN set -e \
&& mold -run cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \
--bin pageserver_binutils \
--bin draw_timeline_dir \
--bin safekeeper \
--bin storage_broker \
--bin proxy \
@@ -72,7 +73,8 @@ RUN set -e \
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pagectl /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin

View File

@@ -632,7 +632,6 @@ RUN apt update && \
libxml2 \
libxslt1.1 \
libzstd1 \
libcurl4-openssl-dev \
procps && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -362,8 +362,6 @@ impl ComputeNode {
};
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
client.simple_query("SET neon.forward_ddl = false")?;
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
@@ -405,9 +403,7 @@ impl ComputeNode {
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
if spec.mode == ComputeMode::Primary {
client.simple_query("SET neon.forward_ddl = false")?;
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;

View File

@@ -121,8 +121,9 @@ impl RoleExt for Role {
/// string of arguments.
fn to_pg_options(&self) -> String {
// XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane.
let mut params: String = self.options.as_pg_options();
params.push_str(" LOGIN");
// For now, we do not use generic `options` for roles. Once used, add
// `self.options.as_pg_options()` somewhere here.
let mut params: String = "LOGIN".to_string();
if let Some(pass) = &self.encrypted_password {
// Some time ago we supported only md5 and treated all encrypted_password as md5.

View File

@@ -62,7 +62,7 @@ fn do_control_plane_request(
}
}
/// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
/// env variable is set, it will be used for authorization.
pub fn get_spec_from_control_plane(
base_uri: &str,

View File

@@ -16,7 +16,7 @@ mod pg_helpers_tests {
);
assert_eq!(
spec.cluster.roles.first().unwrap().to_pg_options(),
" LOGIN PASSWORD 'md56b1d16b78004bbd51fa06af9eda75972'"
"LOGIN PASSWORD 'md56b1d16b78004bbd51fa06af9eda75972'"
);
}

View File

@@ -41,7 +41,7 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: &str = "15";
const DEFAULT_PG_VERSION: &str = "14";
fn default_conf() -> String {
format!(

View File

@@ -24,7 +24,7 @@ use utils::{
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 15;
pub const DEFAULT_PG_VERSION: u32 = 14;
//
// This data structures represents neon_local CLI config

View File

@@ -370,10 +370,6 @@ impl PageServerNode {
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
};
// If tenant ID was not specified, generate one
let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
let request = models::TenantCreateRequest {
new_tenant_id,
config,
@@ -499,9 +495,6 @@ impl PageServerNode {
ancestor_timeline_id: Option<TimelineId>,
pg_version: Option<u32>,
) -> anyhow::Result<TimelineInfo> {
// If timeline ID was not specified, generate one
let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate());
self.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),

View File

@@ -1,14 +1,6 @@
#!/bin/bash
set -eux
# Generate a random tenant or timeline ID
#
# Takes a variable name as argument. The result is stored in that variable.
generate_id() {
local -n resvar=$1
printf -v resvar '%08x%08x%08x%08x' $SRANDOM $SRANDOM $SRANDOM $SRANDOM
}
PG_VERSION=${PG_VERSION:-14}
SPEC_FILE_ORG=/var/db/postgres/specs/spec.json
@@ -21,29 +13,29 @@ done
echo "Page server is ready."
echo "Create a tenant and timeline"
generate_id tenant_id
PARAMS=(
-sb
-X POST
-H "Content-Type: application/json"
-d "{\"new_tenant_id\": \"${tenant_id}\"}"
-d "{}"
http://pageserver:9898/v1/tenant/
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
tenant_id=$(curl "${PARAMS[@]}" | sed 's/"//g')
generate_id timeline_id
PARAMS=(
-sb
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${timeline_id}\", \"pg_version\": ${PG_VERSION}}"
-d "{\"tenant_id\":\"${tenant_id}\", \"pg_version\": ${PG_VERSION}}"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
echo "Overwrite tenant id and timeline id in spec file"
tenant_id=$(echo ${result} | jq -r .tenant_id)
timeline_id=$(echo ${result} | jq -r .timeline_id)
sed "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE_ORG} > ${SPEC_FILE}
sed -i "s/TIMELINE_ID/${timeline_id}/" ${SPEC_FILE}

View File

@@ -18,29 +18,7 @@ use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
/// The state of a tenant in this pageserver.
///
/// ```mermaid
/// stateDiagram-v2
///
/// [*] --> Loading: spawn_load()
/// [*] --> Attaching: spawn_attach()
///
/// Loading --> Activating: activate()
/// Attaching --> Activating: activate()
/// Activating --> Active: infallible
///
/// Loading --> Broken: load() failure
/// Attaching --> Broken: attach() failure
///
/// Active --> Stopping: set_stopping(), part of shutdown & detach
/// Stopping --> Broken: late error in remove_tenant_from_memory
///
/// Broken --> [*]: ignore / detach / shutdown
/// Stopping --> [*]: remove_from_memory complete
///
/// Active --> Broken: cfg(testing)-only tenant break point
/// ```
/// A state of a tenant in pageserver's memory.
#[derive(
Clone,
PartialEq,
@@ -48,63 +26,43 @@ use bytes::{BufMut, Bytes, BytesMut};
serde::Serialize,
serde::Deserialize,
strum_macros::Display,
strum_macros::EnumString,
strum_macros::EnumVariantNames,
strum_macros::AsRefStr,
strum_macros::IntoStaticStr,
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
/// This tenant is being loaded from local disk.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
/// This tenant is being loaded from local disk
Loading,
/// This tenant is being attached to the pageserver.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
/// This tenant is being downloaded from cloud storage.
Attaching,
/// The tenant is transitioning from Loading/Attaching to Active.
///
/// While in this state, the individual timelines are being activated.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Activating(ActivatingFrom),
/// The tenant has finished activating and is open for business.
///
/// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
Activating,
/// Tenant is fully operational
Active,
/// The tenant is recognized by pageserver, but it is being detached or the
/// A tenant is recognized by pageserver, but it is being detached or the
/// system is being shut down.
///
/// Transitions out of this state are possible through `set_broken()`.
Stopping,
/// The tenant is recognized by the pageserver, but can no longer be used for
/// any operations.
///
/// If the tenant fails to load or attach, it will transition to this state
/// and it is guaranteed that no background tasks are running in its name.
///
/// The other way to transition into this state is from `Stopping` state
/// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
/// if the cleanup future executed by `remove_tenant_from_memory()` fails.
/// A tenant is recognized by the pageserver, but can no longer be used for
/// any operations, because it failed to be activated.
Broken { reason: String, backtrace: String },
}
impl TenantState {
pub fn attachment_status(&self) -> TenantAttachmentStatus {
use TenantAttachmentStatus::*;
// Below TenantState::Activating is used as "transient" or "transparent" state for
// attachment_status determining.
match self {
// The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
// So, technically, we can return Attached here.
// However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
// But, our attach task might still be fetching the remote timelines, etc.
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
Self::Attaching => Maybe,
// tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
Self::Loading => Attached,
Self::Activating => todo!(),
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.
Self::Active => Attached,
@@ -143,15 +101,6 @@ impl std::fmt::Debug for TenantState {
}
}
/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ActivatingFrom {
/// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
Loading,
/// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
Attaching,
}
/// A state of a timeline in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TimelineState {
@@ -172,8 +121,9 @@ pub enum TimelineState {
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub new_timeline_id: TimelineId,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_timeline_id: Option<TimelineId>,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
@@ -184,11 +134,12 @@ pub struct TimelineCreateRequest {
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub new_tenant_id: TenantId,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_tenant_id: Option<TenantId>,
#[serde(flatten)]
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
}
@@ -236,10 +187,10 @@ pub struct StatusResponse {
}
impl TenantCreateRequest {
pub fn new(new_tenant_id: TenantId) -> TenantCreateRequest {
pub fn new(new_tenant_id: Option<TenantId>) -> TenantCreateRequest {
TenantCreateRequest {
new_tenant_id,
config: TenantConfig::default(),
..Default::default()
}
}
}
@@ -883,55 +834,4 @@ mod tests {
err
);
}
#[test]
fn tenantstatus_activating_serde() {
let states = [
TenantState::Activating(ActivatingFrom::Loading),
TenantState::Activating(ActivatingFrom::Attaching),
];
let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
let actual = serde_json::to_string(&states).unwrap();
assert_eq!(actual, expected);
let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
assert_eq!(states.as_slice(), &parsed);
}
#[test]
fn tenantstatus_activating_strum() {
// tests added, because we use these for metrics
let examples = [
(line!(), TenantState::Loading, "Loading"),
(line!(), TenantState::Attaching, "Attaching"),
(
line!(),
TenantState::Activating(ActivatingFrom::Loading),
"Activating",
),
(
line!(),
TenantState::Activating(ActivatingFrom::Attaching),
"Activating",
),
(line!(), TenantState::Active, "Active"),
(line!(), TenantState::Stopping, "Stopping"),
(
line!(),
TenantState::Broken {
reason: "Example".into(),
backtrace: "Looooong backtrace".into(),
},
"Broken",
),
];
for (line, rendered, expected) in examples {
let actual: &'static str = rendered.into();
assert_eq!(actual, expected, "example on {line}");
}
}
}

View File

@@ -1,33 +0,0 @@
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///
/// Can be cloned, moved and kept around in futures as "guard objects".
#[derive(Clone)]
pub struct Completion(mpsc::Sender<()>);
/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
pub struct Barrier(Arc<Mutex<mpsc::Receiver<()>>>);
impl Barrier {
pub async fn wait(self) {
self.0.lock().await.recv().await;
}
pub async fn maybe_wait(barrier: Option<Barrier>) {
if let Some(b) = barrier {
b.wait().await
}
}
}
/// Create new Guard and Barrier pair.
pub fn channel() -> (Completion, Barrier) {
let (tx, rx) = mpsc::channel::<()>(1);
let rx = Mutex::new(rx);
let rx = Arc::new(rx);
(Completion(tx), Barrier(rx))
}

View File

@@ -1,5 +1,5 @@
use crate::auth::{Claims, JwtAuth};
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
use crate::http::error;
use anyhow::{anyhow, Context};
use hyper::header::{HeaderName, AUTHORIZATION};
use hyper::http::HeaderValue;
@@ -16,6 +16,8 @@ use std::future::Future;
use std::net::TcpListener;
use std::str::FromStr;
use super::error::ApiError;
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"libmetrics_metric_handler_requests_total",
@@ -33,12 +35,8 @@ struct RequestId(String);
/// Adds a tracing info_span! instrumentation around the handler events,
/// logs the request start and end events for non-GET requests and non-200 responses.
///
/// Usage: Replace `my_handler` with `|r| request_span(r, my_handler)`
///
/// Use this to distinguish between logs of different HTTP requests: every request handler wrapped
/// with this will get request info logged in the wrapping span, including the unique request ID.
///
/// This also handles errors, logging them and converting them to an HTTP error response.
/// in this type will get request info logged in the wrapping span, including the unique request ID.
///
/// There could be other ways to implement similar functionality:
///
@@ -56,56 +54,60 @@ struct RequestId(String);
/// tries to achive with its `.instrument` used in the current approach.
///
/// If needed, a declarative macro to substitute the |r| ... closure boilerplate could be introduced.
pub async fn request_span<R, H>(request: Request<Body>, handler: H) -> R::Output
pub struct RequestSpan<E, R, H>(pub H)
where
R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
R: Future<Output = Result<Response<Body>, E>> + Send + 'static,
H: Fn(Request<Body>) -> R + Send + Sync + 'static;
impl<E, R, H> RequestSpan<E, R, H>
where
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
R: Future<Output = Result<Response<Body>, E>> + Send + 'static,
H: Fn(Request<Body>) -> R + Send + Sync + 'static,
{
let request_id = request.context::<RequestId>().unwrap_or_default().0;
let method = request.method();
let path = request.uri().path();
let request_span = info_span!("request", %method, %path, %request_id);
/// Creates a tracing span around inner request handler and executes the request handler in the contex of that span.
/// Use as `|r| RequestSpan(my_handler).handle(r)` instead of `my_handler` as the request handler to get the span enabled.
pub async fn handle(self, request: Request<Body>) -> Result<Response<Body>, E> {
let request_id = request.context::<RequestId>().unwrap_or_default().0;
let method = request.method();
let path = request.uri().path();
let request_span = info_span!("request", %method, %path, %request_id);
let log_quietly = method == Method::GET;
async move {
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
if log_quietly {
debug!("Handling request");
} else {
info!("Handling request");
}
// No special handling for panics here. There's a `tracing_panic_hook` from another
// module to do that globally.
let res = handler(request).await;
cancellation_guard.disarm();
// Log the result if needed.
//
// We also convert any errors into an Ok response with HTTP error code here.
// `make_router` sets a last-resort error handler that would do the same, but
// we prefer to do it here, before we exit the request span, so that the error
// is still logged with the span.
//
// (Because we convert errors to Ok response, we never actually return an error,
// and we could declare the function to return the never type (`!`). However,
// using `routerify::RouterBuilder` requires a proper error type.)
match res {
Ok(response) => {
let response_status = response.status();
if log_quietly && response_status.is_success() {
debug!("Request handled, status: {response_status}");
} else {
info!("Request handled, status: {response_status}");
}
Ok(response)
let log_quietly = method == Method::GET;
async move {
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
if log_quietly {
debug!("Handling request");
} else {
info!("Handling request");
}
// Note that we reuse `error::handler` here and not returning and error at all,
// yet cannot use `!` directly in the method signature due to `routerify::RouterBuilder` limitation.
// Usage of the error handler also means that we expect only the `ApiError` errors to be raised in this call.
//
// Panics are not handled separately, there's a `tracing_panic_hook` from another module to do that globally.
let res = (self.0)(request).await;
cancellation_guard.disarm();
match res {
Ok(response) => {
let response_status = response.status();
if log_quietly && response_status.is_success() {
debug!("Request handled, status: {response_status}");
} else {
info!("Request handled, status: {response_status}");
}
Ok(response)
}
Err(e) => Ok(error::handler(e.into()).await),
}
Err(err) => Ok(api_error_handler(err)),
}
.instrument(request_span)
.await
}
.instrument(request_span)
.await
}
/// Drop guard to WARN in case the request was dropped before completion.
@@ -205,8 +207,10 @@ pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
.middleware(Middleware::post_with_info(
add_request_id_header_to_response,
))
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.err_handler(route_error_handler)
.get("/metrics", |r| {
RequestSpan(prometheus_metrics_handler).handle(r)
})
.err_handler(error::handler)
}
pub fn attach_openapi_ui(
@@ -216,14 +220,12 @@ pub fn attach_openapi_ui(
ui_mount_path: &'static str,
) -> RouterBuilder<hyper::Body, ApiError> {
router_builder
.get(spec_mount_path,
move |r| request_span(r, move |_| async move {
Ok(Response::builder().body(Body::from(spec)).unwrap())
})
)
.get(ui_mount_path,
move |r| request_span(r, move |_| async move {
Ok(Response::builder().body(Body::from(format!(r#"
.get(spec_mount_path, move |r| {
RequestSpan(move |_| async move { Ok(Response::builder().body(Body::from(spec)).unwrap()) })
.handle(r)
})
.get(ui_mount_path, move |r| RequestSpan( move |_| async move {
Ok(Response::builder().body(Body::from(format!(r#"
<!DOCTYPE html>
<html lang="en">
<head>
@@ -253,8 +255,7 @@ pub fn attach_openapi_ui(
</body>
</html>
"#, spec_mount_path))).unwrap())
})
)
}).handle(r))
}
fn parse_token(header_value: &str) -> Result<&str, ApiError> {

View File

@@ -83,24 +83,13 @@ impl HttpErrorBody {
}
}
pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
match err.downcast::<ApiError>() {
Ok(api_error) => api_error_handler(*api_error),
Err(other_error) => {
// We expect all the request handlers to return an ApiError, so this should
// not be reached. But just in case.
error!("Error processing HTTP request: {other_error:?}");
HttpErrorBody::response_from_msg_and_status(
other_error.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}
pub async fn handler(err: routerify::RouteError) -> Response<Body> {
let api_error = err
.downcast::<ApiError>()
.expect("handler should always return api error");
pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
// Print a stack trace for Internal Server errors
if let ApiError::InternalServerError(_) = api_error {
if let ApiError::InternalServerError(_) = api_error.as_ref() {
error!("Error processing HTTP request: {api_error:?}");
} else {
error!("Error processing HTTP request: {api_error:#}");

View File

@@ -60,9 +60,6 @@ pub mod tracing_span_assert;
pub mod rate_limit;
/// Simple once-barrier and a guard which keeps barrier awaiting.
pub mod completion;
mod failpoint_macro_helpers {
/// use with fail::cfg("$name", "return(2000)")

View File

@@ -1,18 +0,0 @@
[package]
name = "pagectl"
version = "0.1.0"
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow.workspace = true
bytes.workspace = true
clap = { workspace = true, features = ["string"] }
git-version.workspace = true
pageserver = { path = ".." }
postgres_ffi.workspace = true
utils.workspace = true
svg_fmt.workspace = true
workspace_hack.workspace = true

View File

@@ -1,169 +0,0 @@
use std::path::{Path, PathBuf};
use anyhow::Result;
use clap::Subcommand;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::{page_cache, virtual_file};
use pageserver::{
repository::{Key, KEY_SIZE},
tenant::{
block_io::FileBlockReader, disk_btree::VisitDirection,
storage_layer::delta_layer::DELTA_KEY_SIZE,
},
virtual_file::VirtualFile,
};
use std::fs;
use utils::bin_ser::BeSer;
use crate::layer_map_analyzer::parse_filename;
#[derive(Subcommand)]
pub(crate) enum LayerCmd {
/// List all tenants and timelines under the pageserver path
///
/// Example: `cargo run --bin pagectl layer list .neon/`
List { path: PathBuf },
/// List all layers of a given tenant and timeline
///
/// Example: `cargo run --bin pagectl layer list .neon/`
ListLayer {
path: PathBuf,
tenant: String,
timeline: String,
},
/// Dump all information of a layer file
DumpLayer {
path: PathBuf,
tenant: String,
timeline: String,
/// The id from list-layer command
id: usize,
},
}
fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
use pageserver::tenant::blob_io::BlobCursor;
use pageserver::tenant::block_io::BlockReader;
let path = path.as_ref();
virtual_file::init(10);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,
actual_summary.index_root_blk,
&file,
);
// TODO(chi): dedup w/ `delta_layer.rs` by exposing the API.
let mut all = vec![];
tree_reader.visit(
&[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards,
|key, value_offset| {
let curr = Key::from_slice(&key[..KEY_SIZE]);
all.push((curr, BlobRef(value_offset)));
true
},
)?;
let mut cursor = BlockCursor::new(&file);
for (k, v) in all {
let value = cursor.read_blob(v.pos())?;
println!("key:{} value_len:{}", k, value.len());
}
// TODO(chi): special handling for last key?
Ok(())
}
pub(crate) fn main(cmd: &LayerCmd) -> Result<()> {
match cmd {
LayerCmd::List { path } => {
for tenant in fs::read_dir(path.join("tenants"))? {
let tenant = tenant?;
if !tenant.file_type()?.is_dir() {
continue;
}
println!("tenant {}", tenant.file_name().to_string_lossy());
for timeline in fs::read_dir(tenant.path().join("timelines"))? {
let timeline = timeline?;
if !timeline.file_type()?.is_dir() {
continue;
}
println!("- timeline {}", timeline.file_name().to_string_lossy());
}
}
}
LayerCmd::ListLayer {
path,
tenant,
timeline,
} => {
let timeline_path = path
.join("tenants")
.join(tenant)
.join("timelines")
.join(timeline);
let mut idx = 0;
for layer in fs::read_dir(timeline_path)? {
let layer = layer?;
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
{
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
idx += 1;
}
}
}
LayerCmd::DumpLayer {
path,
tenant,
timeline,
id,
} => {
let timeline_path = path
.join("tenants")
.join(tenant)
.join("timelines")
.join(timeline);
let mut idx = 0;
for layer in fs::read_dir(timeline_path)? {
let layer = layer?;
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
{
if *id == idx {
// TODO(chi): dedup code
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
if layer_file.is_delta {
read_delta_file(layer.path())?;
} else {
anyhow::bail!("not supported yet :(");
}
break;
}
idx += 1;
}
}
}
}
Ok(())
}

View File

@@ -1,179 +0,0 @@
//! A helper tool to manage pageserver binary files.
//! Accepts a file as an argument, attempts to parse it with all ways possible
//! and prints its interpreted context.
//!
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
mod draw_timeline_dir;
mod layer_map_analyzer;
mod layers;
use clap::{Parser, Subcommand};
use layers::LayerCmd;
use pageserver::{
context::{DownloadBehavior, RequestContext},
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use postgres_ffi::ControlFileData;
use std::path::{Path, PathBuf};
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
#[derive(Parser)]
#[command(
version = GIT_VERSION,
about = "Neon Pageserver binutils",
long_about = "Reads pageserver (and related) binary files management utility"
)]
#[command(propagate_version = true)]
struct CliOpts {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Metadata(MetadataCmd),
PrintLayerFile(PrintLayerFileCmd),
DrawTimeline {},
AnalyzeLayerMap(AnalyzeLayerMapCmd),
#[command(subcommand)]
Layer(LayerCmd),
}
/// Read and update pageserver metadata file
#[derive(Parser)]
struct MetadataCmd {
/// Input metadata file path
metadata_path: PathBuf,
/// Replace disk consistent Lsn
disk_consistent_lsn: Option<Lsn>,
/// Replace previous record Lsn
prev_record_lsn: Option<Lsn>,
/// Replace latest gc cuttoff
latest_gc_cuttoff: Option<Lsn>,
}
#[derive(Parser)]
struct PrintLayerFileCmd {
/// Pageserver data path
path: PathBuf,
}
#[derive(Parser)]
struct AnalyzeLayerMapCmd {
/// Pageserver data path
path: PathBuf,
/// Max holes
max_holes: Option<usize>,
}
fn main() -> anyhow::Result<()> {
let cli = CliOpts::parse();
match cli.command {
Commands::Layer(cmd) => {
layers::main(&cmd)?;
}
Commands::Metadata(cmd) => {
handle_metadata(&cmd)?;
}
Commands::DrawTimeline {} => {
draw_timeline_dir::main()?;
}
Commands::AnalyzeLayerMap(cmd) => {
layer_map_analyzer::main(&cmd)?;
}
Commands::PrintLayerFile(cmd) => {
if let Err(e) = read_pg_control_file(&cmd.path) {
println!(
"Failed to read input file as a pg control one: {e:#}\n\
Attempting to read it as layer file"
);
print_layerfile(&cmd.path)?;
}
}
};
Ok(())
}
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
println!("{control_file:?}");
let control_file_initdb = Lsn(control_file.checkPoint);
println!(
"pg_initdb_lsn: {}, aligned: {}",
control_file_initdb,
control_file_initdb.align()
);
Ok(())
}
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx)
}
fn handle_metadata(
MetadataCmd {
metadata_path: path,
disk_consistent_lsn,
prev_record_lsn,
latest_gc_cuttoff,
}: &MetadataCmd,
) -> Result<(), anyhow::Error> {
let metadata_bytes = std::fs::read(path)?;
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
if let Some(disk_consistent_lsn) = disk_consistent_lsn {
meta = TimelineMetadata::new(
*disk_consistent_lsn,
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(prev_record_lsn) = prev_record_lsn {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
Some(*prev_record_lsn),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(latest_gc_cuttoff) = latest_gc_cuttoff {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
*latest_gc_cuttoff,
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if update_meta {
let metadata_bytes = meta.to_bytes()?;
std::fs::write(path, metadata_bytes)?;
}
Ok(())
}

View File

@@ -12,7 +12,7 @@
//! Example use:
//! ```
//! $ ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
//! $ grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
//! $ grep "__" | cargo run --release --bin draw_timeline_dir > out.svg
//! $ firefox out.svg
//! ```
//!
@@ -62,7 +62,7 @@ fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
(keys, lsns)
}
pub fn main() -> Result<()> {
fn main() -> Result<()> {
// Parse layer filenames from stdin
let mut ranges: Vec<(Range<Key>, Range<Lsn>)> = vec![];
let stdin = io::stdin();

View File

@@ -6,7 +6,7 @@ use anyhow::Result;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::Range;
use std::{fs, path::Path, str};
use std::{env, fs, path::Path, path::PathBuf, str, str::FromStr};
use pageserver::page_cache::PAGE_SZ;
use pageserver::repository::{Key, KEY_SIZE};
@@ -18,14 +18,12 @@ use pageserver::virtual_file::VirtualFile;
use utils::{bin_ser::BeSer, lsn::Lsn};
use crate::AnalyzeLayerMapCmd;
const MIN_HOLE_LENGTH: i128 = (128 * 1024 * 1024 / PAGE_SZ) as i128;
const DEFAULT_MAX_HOLES: usize = 10;
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(PartialEq, Eq)]
pub struct Hole(Range<Key>);
struct Hole(Range<Key>);
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
@@ -41,11 +39,11 @@ impl PartialOrd for Hole {
}
}
pub(crate) struct LayerFile {
pub key_range: Range<Key>,
pub lsn_range: Range<Lsn>,
pub is_delta: bool,
pub holes: Vec<Hole>,
struct LayerFile {
key_range: Range<Key>,
lsn_range: Range<Lsn>,
is_delta: bool,
holes: Vec<Hole>,
}
impl LayerFile {
@@ -69,7 +67,7 @@ impl LayerFile {
}
}
pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
fn parse_filename(name: &str) -> Option<LayerFile> {
let split: Vec<&str> = name.split("__").collect();
if split.len() != 2 {
return None;
@@ -129,9 +127,18 @@ fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
Ok(holes)
}
pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let storage_path = &cmd.path;
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
println!("Usage: layer_map_analyzer PAGESERVER_DATA_DIR [MAX_HOLES]");
return Ok(());
}
let storage_path = PathBuf::from_str(&args[1])?;
let max_holes = if args.len() > 2 {
args[2].parse::<usize>().unwrap()
} else {
DEFAULT_MAX_HOLES
};
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10);

View File

@@ -275,7 +275,6 @@ fn start_pageserver(
let pageserver_listener = tcp_listener::bind(pg_addr)?;
// Launch broker client
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME
.block_on(async {
// Note: we do not attempt connecting here (but validate endpoints sanity).
@@ -335,34 +334,13 @@ fn start_pageserver(
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
// All tenant load operations carry this while they are ongoing; it will be dropped once those
// operations finish either successfully or in some other manner. However, the initial load
// will be then done, and we can start the global background tasks.
let (init_done_tx, init_done_rx) = utils::completion::channel();
// Scan the local 'tenants/' directory and start loading the tenants
let init_started_at = std::time::Instant::now();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
(init_done_tx, init_done_rx.clone()),
))?;
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx.clone();
async move {
init_done_rx.wait().await;
let elapsed = init_started_at.elapsed();
tracing::info!(
elapsed_millis = elapsed.as_millis(),
"Initial load completed."
);
}
});
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
// is still accessible even if background task is not configured as long as remote storage has
@@ -374,7 +352,6 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
init_done_rx.clone(),
)?;
}
@@ -412,7 +389,6 @@ fn start_pageserver(
);
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let init_done_rx = init_done_rx;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
@@ -428,13 +404,6 @@ fn start_pageserver(
"consumption metrics collection",
true,
async move {
// first wait for initial load to complete before first iteration.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
init_done_rx.wait().await;
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,

View File

@@ -0,0 +1,174 @@
//! A helper tool to manage pageserver binary files.
//! Accepts a file as an argument, attempts to parse it with all ways possible
//! and prints its interpreted context.
//!
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
use std::{
path::{Path, PathBuf},
str::FromStr,
};
use anyhow::Context;
use clap::{value_parser, Arg, Command};
use pageserver::{
context::{DownloadBehavior, RequestContext},
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use postgres_ffi::ControlFileData;
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
const METADATA_SUBCOMMAND: &str = "metadata";
fn main() -> anyhow::Result<()> {
let arg_matches = cli().get_matches();
match arg_matches.subcommand() {
Some((subcommand_name, subcommand_matches)) => {
let path = subcommand_matches
.get_one::<PathBuf>("metadata_path")
.context("'metadata_path' argument is missing")?
.to_path_buf();
anyhow::ensure!(
subcommand_name == METADATA_SUBCOMMAND,
"Unknown subcommand {subcommand_name}"
);
handle_metadata(&path, subcommand_matches)?;
}
None => {
let path = arg_matches
.get_one::<PathBuf>("path")
.context("'path' argument is missing")?
.to_path_buf();
println!(
"No subcommand specified, attempting to guess the format for file {}",
path.display()
);
if let Err(e) = read_pg_control_file(&path) {
println!(
"Failed to read input file as a pg control one: {e:#}\n\
Attempting to read it as layer file"
);
print_layerfile(&path)?;
}
}
};
Ok(())
}
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
println!("{control_file:?}");
let control_file_initdb = Lsn(control_file.checkPoint);
println!(
"pg_initdb_lsn: {}, aligned: {}",
control_file_initdb,
control_file_initdb.align()
);
Ok(())
}
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx)
}
fn handle_metadata(path: &Path, arg_matches: &clap::ArgMatches) -> Result<(), anyhow::Error> {
let metadata_bytes = std::fs::read(path)?;
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
if let Some(disk_consistent_lsn) = arg_matches.get_one::<String>("disk_consistent_lsn") {
meta = TimelineMetadata::new(
Lsn::from_str(disk_consistent_lsn)?,
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(prev_record_lsn) = arg_matches.get_one::<String>("prev_record_lsn") {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
Some(Lsn::from_str(prev_record_lsn)?),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(latest_gc_cuttoff) = arg_matches.get_one::<String>("latest_gc_cuttoff") {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
Lsn::from_str(latest_gc_cuttoff)?,
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if update_meta {
let metadata_bytes = meta.to_bytes()?;
std::fs::write(path, metadata_bytes)?;
}
Ok(())
}
fn cli() -> Command {
Command::new("Neon Pageserver binutils")
.about("Reads pageserver (and related) binary files management utility")
.version(GIT_VERSION)
.arg(
Arg::new("path")
.help("Input file path")
.value_parser(value_parser!(PathBuf))
.required(false),
)
.subcommand(
Command::new(METADATA_SUBCOMMAND)
.about("Read and update pageserver metadata file")
.arg(
Arg::new("metadata_path")
.help("Input metadata file path")
.value_parser(value_parser!(PathBuf))
.required(false),
)
.arg(
Arg::new("disk_consistent_lsn")
.long("disk_consistent_lsn")
.help("Replace disk consistent Lsn"),
)
.arg(
Arg::new("prev_record_lsn")
.long("prev_record_lsn")
.help("Replace previous record Lsn"),
)
.arg(
Arg::new("latest_gc_cuttoff")
.long("latest_gc_cuttoff")
.help("Replace latest gc cuttoff"),
),
)
}
#[test]
fn verify_cli() {
cli().debug_assert();
}

View File

@@ -150,7 +150,7 @@ pub async fn collect_metrics_iteration(
let mut tenant_resident_size = 0;
// iterate through list of timelines in tenant
for timeline in tenant.list_timelines().iter() {
for timeline in tenant.list_timelines().await.iter() {
// collect per-timeline metrics only for active timelines
if timeline.is_active() {
let timeline_written_size = u64::from(timeline.get_last_record_lsn());

View File

@@ -88,7 +88,6 @@
use crate::task_mgr::TaskKind;
// The main structure of this module, see module-level comment.
#[derive(Clone, Debug)]
pub struct RequestContext {
task_kind: TaskKind,
download_behavior: DownloadBehavior,
@@ -96,7 +95,7 @@ pub struct RequestContext {
/// Desired behavior if the operation requires an on-demand download
/// to proceed.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum DownloadBehavior {
/// Download the layer file. It can take a while.
Download,

View File

@@ -54,13 +54,12 @@ use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::completion;
use utils::serde_percent::Percent;
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{self, storage_layer::{PersistentLayer, RemoteLayerDesc}, Timeline},
tenant::{self, storage_layer::PersistentLayer, Timeline},
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -83,7 +82,6 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
init_done: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
@@ -100,9 +98,6 @@ pub fn launch_disk_usage_global_eviction_task(
"disk usage based eviction",
false,
async move {
// wait until initial load is complete, because we cannot evict from loading tenants.
init_done.wait().await;
disk_usage_eviction_task(
&state,
task_config,
@@ -329,7 +324,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
let mut batched: HashMap<_, Vec<Arc<RemoteLayerDesc>>> = HashMap::new();
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
@@ -434,7 +429,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
layer: Arc<RemoteLayerDesc>,
layer: Arc<dyn PersistentLayer>,
last_activity_ts: SystemTime,
}
@@ -513,7 +508,7 @@ async fn collect_eviction_candidates(
// a little unfair to tenants during shutdown in such a situation is tolerable.
let mut tenant_candidates = Vec::new();
let mut max_layer_size = 0;
for tl in tenant.list_timelines() {
for tl in tenant.list_timelines().await {
if !tl.is_active() {
continue;
}

View File

@@ -678,8 +678,6 @@ paths:
application/json:
schema:
type: object
required:
- new_timeline_id
properties:
new_timeline_id:
type: string
@@ -938,8 +936,6 @@ components:
allOf:
- $ref: '#/components/schemas/TenantConfig'
- type: object
required:
- new_tenant_id
properties:
new_tenant_id:
type: string

View File

@@ -11,7 +11,7 @@ use storage_broker::BrokerClientChannel;
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::http::endpoint::request_span;
use utils::http::endpoint::RequestSpan;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
@@ -25,9 +25,7 @@ use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
};
use crate::tenant::mgr::{TenantMapInsertError, TenantStateError};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
@@ -146,36 +144,6 @@ impl From<TenantStateError> for ApiError {
}
}
impl From<GetTenantError> for ApiError {
fn from(tse: GetTenantError) -> ApiError {
match tse {
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid)),
e @ GetTenantError::NotActive(_) => {
// Why is this not `ApiError::NotFound`?
// Because we must be careful to never return 404 for a tenant if it does
// in fact exist locally. If we did, the caller could draw the conclusion
// that it can attach the tenant to another PS and we'd be in split-brain.
//
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
ApiError::InternalServerError(anyhow::Error::new(e))
}
}
}
}
impl From<SetNewTenantConfigError> for ApiError {
fn from(e: SetNewTenantConfigError) -> ApiError {
match e {
SetNewTenantConfigError::GetTenant(tid) => {
ApiError::NotFound(anyhow!("tenant {}", tid))
}
e @ SetNewTenantConfigError::Persist(_) => {
ApiError::InternalServerError(anyhow::Error::new(e))
}
}
}
}
impl From<crate::tenant::DeleteTimelineError> for ApiError {
fn from(value: crate::tenant::DeleteTimelineError) -> Self {
use crate::tenant::DeleteTimelineError::*;
@@ -195,7 +163,7 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
match value {
// Report Precondition failed so client can distinguish between
// "tenant is missing" case from "timeline is missing"
Tenant(GetTenantError::NotFound(..)) => {
Tenant(TenantStateError::NotFound(..)) => {
ApiError::PreconditionFailed("Requested tenant is missing")
}
Tenant(t) => ApiError::from(t),
@@ -301,7 +269,9 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_id))?;
let new_timeline_id = request_data.new_timeline_id;
let new_timeline_id = request_data
.new_timeline_id
.unwrap_or_else(TimelineId::generate);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
@@ -328,7 +298,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
Err(err) => Err(ApiError::InternalServerError(err)),
}
}
.instrument(info_span!("timeline_create", tenant = %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.await
}
@@ -342,7 +312,7 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
let response_data = async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let timelines = tenant.list_timelines();
let timelines = tenant.list_timelines().await;
let mut response_data = Vec::with_capacity(timelines.len());
for timeline in timelines {
@@ -381,6 +351,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
let timeline = tenant
.get_timeline(timeline_id, false)
.await
.map_err(ApiError::NotFound)?;
let timeline_info = build_timeline_info(
@@ -550,12 +521,12 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
for timeline in tenant.list_timelines().iter() {
for timeline in tenant.list_timelines().await.iter() {
current_physical_size += timeline.layer_size_sum();
}
let state = tenant.current_state();
Result::<_, ApiError>::Ok(TenantInfo {
Ok(TenantInfo {
id: tenant_id,
state: state.clone(),
current_physical_size: Some(current_physical_size),
@@ -563,7 +534,8 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
})
}
.instrument(info_span!("tenant_status_handler", tenant = %tenant_id))
.await?;
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, tenant_info)
}
@@ -762,8 +734,6 @@ pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>,
}
async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let target_tenant_id = request_data.new_tenant_id;
check_permission(&request, None)?;
let _timer = STORAGE_TIME_GLOBAL
@@ -771,10 +741,17 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
.expect("bug")
.start_timer();
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let tenant_conf =
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let target_tenant_id = request_data
.new_tenant_id
.map(TenantId::from)
.unwrap_or_else(TenantId::generate);
let state = get_state(&request);
@@ -999,6 +976,7 @@ async fn active_timeline_of_active_tenant(
let tenant = mgr::get_tenant(tenant_id, true).await?;
tenant
.get_timeline(timeline_id, true)
.await
.map_err(ApiError::NotFound)
}
@@ -1179,7 +1157,7 @@ pub fn make_router(
#[cfg(not(feature = "testing"))]
let handler = cfg_disabled;
move |r| request_span(r, handler)
move |r| RequestSpan(handler).handle(r)
}};
}
@@ -1194,50 +1172,54 @@ pub fn make_router(
)
.context("Failed to initialize router state")?,
))
.get("/v1/status", |r| request_span(r, status_handler))
.get("/v1/status", |r| RequestSpan(status_handler).handle(r))
.put(
"/v1/failpoints",
testing_api!("manage failpoints", failpoints_handler),
)
.get("/v1/tenant", |r| request_span(r, tenant_list_handler))
.post("/v1/tenant", |r| request_span(r, tenant_create_handler))
.get("/v1/tenant/:tenant_id", |r| request_span(r, tenant_status))
.get("/v1/tenant", |r| RequestSpan(tenant_list_handler).handle(r))
.post("/v1/tenant", |r| {
RequestSpan(tenant_create_handler).handle(r)
})
.get("/v1/tenant/:tenant_id", |r| {
RequestSpan(tenant_status).handle(r)
})
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
request_span(r, tenant_size_handler)
RequestSpan(tenant_size_handler).handle(r)
})
.put("/v1/tenant/config", |r| {
request_span(r, update_tenant_config_handler)
RequestSpan(update_tenant_config_handler).handle(r)
})
.get("/v1/tenant/:tenant_id/config", |r| {
request_span(r, get_tenant_config_handler)
RequestSpan(get_tenant_config_handler).handle(r)
})
.get("/v1/tenant/:tenant_id/timeline", |r| {
request_span(r, timeline_list_handler)
RequestSpan(timeline_list_handler).handle(r)
})
.post("/v1/tenant/:tenant_id/timeline", |r| {
request_span(r, timeline_create_handler)
RequestSpan(timeline_create_handler).handle(r)
})
.post("/v1/tenant/:tenant_id/attach", |r| {
request_span(r, tenant_attach_handler)
RequestSpan(tenant_attach_handler).handle(r)
})
.post("/v1/tenant/:tenant_id/detach", |r| {
request_span(r, tenant_detach_handler)
RequestSpan(tenant_detach_handler).handle(r)
})
.post("/v1/tenant/:tenant_id/load", |r| {
request_span(r, tenant_load_handler)
RequestSpan(tenant_load_handler).handle(r)
})
.post("/v1/tenant/:tenant_id/ignore", |r| {
request_span(r, tenant_ignore_handler)
RequestSpan(tenant_ignore_handler).handle(r)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
request_span(r, timeline_detail_handler)
RequestSpan(timeline_detail_handler).handle(r)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
|r| request_span(r, get_lsn_by_timestamp_handler),
|r| RequestSpan(get_lsn_by_timestamp_handler).handle(r),
)
.put("/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", |r| {
request_span(r, timeline_gc_handler)
RequestSpan(timeline_gc_handler).handle(r)
})
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/compact",
@@ -1249,34 +1231,34 @@ pub fn make_router(
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|r| request_span(r, timeline_download_remote_layers_handler_post),
|r| RequestSpan(timeline_download_remote_layers_handler_post).handle(r),
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|r| request_span(r, timeline_download_remote_layers_handler_get),
|r| RequestSpan(timeline_download_remote_layers_handler_get).handle(r),
)
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
request_span(r, timeline_delete_handler)
RequestSpan(timeline_delete_handler).handle(r)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {
request_span(r, layer_map_info_handler)
RequestSpan(layer_map_info_handler).handle(r)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
|r| request_span(r, layer_download_handler),
|r| RequestSpan(layer_download_handler).handle(r),
)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
|r| request_span(r, evict_timeline_layer_handler),
|r| RequestSpan(evict_timeline_layer_handler).handle(r),
)
.put("/v1/disk_usage_eviction/run", |r| {
request_span(r, disk_usage_eviction_run)
RequestSpan(disk_usage_eviction_run).handle(r)
})
.put(
"/v1/tenant/:tenant_id/break",
testing_api!("set tenant state to broken", handle_tenant_break),
)
.get("/v1/panic", |r| request_span(r, always_panic_handler))
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
.post(
"/v1/tracing/event",
testing_api!("emit a tracing event", post_tracing_event_handler),

View File

@@ -35,7 +35,7 @@ use tracing::info;
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const DEFAULT_PG_VERSION: u32 = 15;
pub const DEFAULT_PG_VERSION: u32 = 14;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
@@ -45,7 +45,6 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument]
pub async fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.

View File

@@ -50,9 +50,7 @@ use crate::import_datadir::import_wal_from_tar;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant;
use crate::tenant::mgr;
use crate::tenant::mgr::GetTenantError;
use crate::tenant::{Tenant, Timeline};
use crate::trace::Tracer;
@@ -390,7 +388,7 @@ impl PageServerHandler {
};
// Check that the timeline exists
let timeline = tenant.get_timeline(timeline_id, true)?;
let timeline = tenant.get_timeline(timeline_id, true).await?;
// switch client to COPYBOTH
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
@@ -489,7 +487,9 @@ impl PageServerHandler {
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
@@ -1150,9 +1150,7 @@ enum GetActiveTenantError {
wait_time: Duration,
},
#[error(transparent)]
NotFound(GetTenantError),
#[error(transparent)]
WaitTenantActive(tenant::WaitToBecomeActiveError),
Other(#[from] anyhow::Error),
}
impl From<GetActiveTenantError> for QueryError {
@@ -1161,8 +1159,7 @@ impl From<GetActiveTenantError> for QueryError {
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)),
GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)),
GetActiveTenantError::Other(e) => QueryError::Other(e),
}
}
}
@@ -1178,16 +1175,13 @@ async fn get_active_tenant_with_timeout(
) -> Result<Arc<Tenant>, GetActiveTenantError> {
let tenant = match mgr::get_tenant(tenant_id, false).await {
Ok(tenant) => tenant,
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(GetTenantError::NotActive(_)) => {
unreachable!("we're calling get_tenant with active=false")
}
Err(e) => return Err(GetActiveTenantError::Other(e.into())),
};
let wait_time = Duration::from_secs(30);
match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
Ok(Ok(())) => Ok(tenant),
// no .context(), the error message is good enough and some tests depend on it
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
Ok(Err(wait_error)) => Err(GetActiveTenantError::Other(wait_error)),
Err(_) => {
let latest_state = tenant.current_state();
if latest_state == TenantState::Active {
@@ -1202,34 +1196,13 @@ async fn get_active_tenant_with_timeout(
}
}
#[derive(Debug, thiserror::Error)]
enum GetActiveTimelineError {
#[error(transparent)]
Tenant(GetActiveTenantError),
#[error(transparent)]
Timeline(anyhow::Error),
}
impl From<GetActiveTimelineError> for QueryError {
fn from(e: GetActiveTimelineError) -> Self {
match e {
GetActiveTimelineError::Tenant(e) => e.into(),
GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
}
}
}
/// Shorthand for getting a reference to a Timeline of an Active tenant.
async fn get_active_tenant_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(GetActiveTimelineError::Timeline)?;
) -> Result<Arc<Timeline>, GetActiveTenantError> {
let tenant = get_active_tenant_with_timeout(tenant_id, ctx).await?;
let timeline = tenant.get_timeline(timeline_id, true).await?;
Ok(timeline)
}

View File

@@ -1594,13 +1594,15 @@ fn is_slru_block_key(key: Key) -> bool {
}
#[cfg(test)]
pub fn create_test_timeline(
pub async fn create_test_timeline(
tenant: &crate::tenant::Tenant,
timeline_id: utils::id::TimelineId,
pg_version: u32,
ctx: &RequestContext,
) -> anyhow::Result<std::sync::Arc<Timeline>> {
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
let tline = tenant
.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)
.await?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;
@@ -1630,7 +1632,7 @@ mod tests {
#[test]
fn test_list_rels_drop() -> Result<()> {
let repo = RepoHarness::create("test_list_rels_drop")?.load();
let tline = create_empty_timeline(repo, TIMELINE_ID)?;
let tline = create_empty_timeline(repo, TIMELINE_ID).await?;
const TESTDB: u32 = 111;
// Import initial dummy checkpoint record, otherwise the get_timeline() call

File diff suppressed because it is too large Load Diff

View File

@@ -1,34 +0,0 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use super::storage_layer::{LayerFileName, PersistentLayer, RemoteLayerDesc};
pub struct LayerCache {
layers: Mutex<HashMap<LayerFileName, Arc<dyn PersistentLayer>>>,
}
impl LayerCache {
pub fn new() -> Self {
Self {
layers: Mutex::new(HashMap::new()),
}
}
pub fn get(&self, layer_fname: &LayerFileName) -> Option<Arc<dyn PersistentLayer>> {
let guard: std::sync::MutexGuard<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> =
self.layers.lock().unwrap();
guard.get(layer_fname).cloned()
}
pub fn contains(&self, layer_fname: &LayerFileName) -> bool {
let guard = self.layers.lock().unwrap();
guard.contains_key(layer_fname)
}
pub fn insert(&self, layer_fname: LayerFileName, persistent_layer: Arc<dyn PersistentLayer>) {
let mut guard = self.layers.lock().unwrap();
guard.insert(layer_fname, persistent_layer);
}
}

View File

@@ -61,7 +61,6 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement;
use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayer;
///
/// LayerMap tracks what layers exist on a timeline.
@@ -139,19 +138,24 @@ where
self.layer_map.remove_historic_noflush(layer)
}
/// Ensure the downloaded layer matches existing layer.
/// Replaces existing layer iff it is the `expected`.
///
/// Returned `Replacement` describes succeeding in checking or the reason why it could not
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
pub fn ensure_consistent(
&self,
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected: &Arc<L>,
new: Arc<dyn PersistentLayer>,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map
.ensure_consistent_noflush(expected, new)
self.layer_map.replace_historic_noflush(expected, new)
}
// We will flush on drop anyway, but this method makes it
@@ -305,16 +309,16 @@ where
}
}
pub(self) fn ensure_consistent_noflush(
&self,
pub(self) fn replace_historic_noflush(
&mut self,
expected: &Arc<L>,
new: Arc<dyn PersistentLayer>,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected);
let new_l0 = LayerMap::<dyn PersistentLayer>::is_l0(&*new);
let new_l0 = Self::is_l0(&new);
anyhow::ensure!(
key == other,
@@ -341,7 +345,17 @@ where
None
};
Ok(Replacement::Replaced { in_buffered: false })
let replaced = self.historic.replace(&key, new.clone(), |existing| {
Self::compare_arced_layers(existing, expected)
});
if let Replacement::Replaced { .. } = &replaced {
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new;
}
}
Ok(replaced)
}
/// Helper function for BatchedUpdates::drop.

View File

@@ -10,7 +10,6 @@ use tokio::fs;
use anyhow::Context;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tracing::*;
use remote_storage::GenericRemoteStorage;
@@ -20,12 +19,9 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
};
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::IGNORED_TENANT_FILE_NAME;
use utils::completion;
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
@@ -67,7 +63,6 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: (completion::Completion, completion::Barrier),
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
@@ -124,7 +119,6 @@ pub async fn init_tenant_mgr(
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
Some(init_done.clone()),
&ctx,
) {
Ok(tenant) => {
@@ -160,7 +154,6 @@ pub fn schedule_local_tenant_processing(
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: Option<(completion::Completion, completion::Barrier)>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
@@ -214,14 +207,7 @@ pub fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf,
tenant_id,
broker_client,
remote_storage,
init_done,
ctx,
)
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
};
Ok(tenant)
}
@@ -236,7 +222,6 @@ pub fn schedule_local_tenant_processing(
/// That could be easily misinterpreted by control plane, the consumer of the
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument]
pub async fn shutdown_all_tenants() {
// Prevent new tenants from being created.
let tenants_to_shut_down = {
@@ -259,65 +244,15 @@ pub async fn shutdown_all_tenants() {
}
};
// Set tenant (and its timlines) to Stoppping state.
//
// Since we can only transition into Stopping state after activation is complete,
// run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
//
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
// 1. Lock out any new requests to the tenants.
// 2. Signal cancellation to WAL receivers (we wait on it below).
// 3. Signal cancellation for other tenant background loops.
// 4. ???
//
// The waiting for the cancellation is not done uniformly.
// We certainly wait for WAL receivers to shut down.
// That is necessary so that no new data comes in before the freeze_and_flush.
// But the tenant background loops are joined-on in our caller.
// It's mesed up.
let mut join_set = JoinSet::new();
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
for (tenant_id, tenant) in tenants_to_shut_down {
join_set.spawn(
async move {
match tenant.set_stopping().await {
Ok(()) => debug!("tenant successfully stopped"),
Err(SetStoppingError::Broken) => {
info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well");
},
Err(SetStoppingError::AlreadyStopping) => {
// our task_mgr::shutdown_tasks are going to coalesce on that just fine
}
}
tenant
}
.instrument(info_span!("set_stopping", %tenant_id)),
);
}
let mut panicked = 0;
while let Some(res) = join_set.join_next().await {
match res {
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures");
}
Err(join_error) if join_error.is_panic() => {
// cannot really do anything, as this panic is likely a bug
panicked += 1;
}
Err(join_error) => {
warn!("unknown kind of JoinError: {join_error}");
}
Ok(tenant) => tenants_to_freeze_and_flush.push(tenant),
for (_, tenant) in tenants_to_shut_down {
if tenant.is_active() {
// updates tenant state, forbidding new GC and compaction iterations from starting
tenant.set_stopping().await;
tenants_to_freeze_and_flush.push(tenant);
}
}
if panicked > 0 {
warn!(panicked, "observed panicks while stopping tenants");
}
// Shut down all existing walreceiver connections and stop accepting the new ones.
task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await;
@@ -329,30 +264,12 @@ pub async fn shutdown_all_tenants() {
// should be no more activity in any of the repositories.
//
// On error, log it but continue with the shutdown for other tenants.
let mut join_set = tokio::task::JoinSet::new();
for tenant in tenants_to_freeze_and_flush {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
join_set.spawn(
async move {
if let Err(err) = tenant.freeze_and_flush().await {
warn!("Could not checkpoint tenant during shutdown: {err:?}");
}
}
.instrument(info_span!("freeze_and_flush", %tenant_id)),
);
}
while let Some(next) = join_set.join_next().await {
match next {
Ok(()) => {}
Err(join_error) if join_error.is_cancelled() => {
unreachable!("no cancelling")
}
Err(join_error) if join_error.is_panic() => { /* reported already */ }
Err(join_error) => warn!("unknown kind of JoinError: {join_error}"),
if let Err(err) = tenant.freeze_and_flush().await {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
}
}
@@ -374,7 +291,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -387,19 +304,11 @@ pub async fn create_tenant(
}).await
}
#[derive(Debug, thiserror::Error)]
pub enum SetNewTenantConfigError {
#[error(transparent)]
GetTenant(#[from] GetTenantError),
#[error(transparent)]
Persist(anyhow::Error),
}
pub async fn set_new_tenant_config(
conf: &'static PageServerConf,
new_tenant_conf: TenantConfOpt,
tenant_id: TenantId,
) -> Result<(), SetNewTenantConfigError> {
) -> Result<(), TenantStateError> {
info!("configuring tenant {tenant_id}");
let tenant = get_tenant(tenant_id, true).await?;
@@ -409,32 +318,23 @@ pub async fn set_new_tenant_config(
&tenant_config_path,
new_tenant_conf,
false,
)
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_tenant_config(new_tenant_conf);
)?;
tenant.set_new_tenant_config(new_tenant_conf).await;
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum GetTenantError {
#[error("Tenant {0} not found")]
NotFound(TenantId),
#[error("Tenant {0} is not active")]
NotActive(TenantId),
}
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
pub async fn get_tenant(
tenant_id: TenantId,
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
) -> Result<Arc<Tenant>, TenantStateError> {
let m = TENANTS.read().await;
let tenant = m
.get(&tenant_id)
.ok_or(GetTenantError::NotFound(tenant_id))?;
.ok_or(TenantStateError::NotFound(tenant_id))?;
if active_only && !tenant.is_active() {
Err(GetTenantError::NotActive(tenant_id))
Err(TenantStateError::NotActive(tenant_id))
} else {
Ok(Arc::clone(tenant))
}
@@ -443,7 +343,7 @@ pub async fn get_tenant(
#[derive(Debug, thiserror::Error)]
pub enum DeleteTimelineError {
#[error("Tenant {0}")]
Tenant(#[from] GetTenantError),
Tenant(#[from] TenantStateError),
#[error("Timeline {0}")]
Timeline(#[from] crate::tenant::DeleteTimelineError),
@@ -520,7 +420,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -593,7 +493,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -672,23 +572,14 @@ where
{
let tenants_accessor = TENANTS.write().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
let tenant = Arc::clone(tenant);
// don't hold TENANTS lock while set_stopping waits for activation to finish
drop(tenants_accessor);
match tenant.set_stopping().await {
Ok(()) => {
// we won, continue stopping procedure
}
Err(SetStoppingError::Broken) => {
// continue the procedure, let's hope the closure can deal with broken tenants
}
Err(SetStoppingError::AlreadyStopping) => {
// the tenant is already stopping or broken, don't do anything
return Err(TenantStateError::IsStopping(tenant_id));
}
}
}
Some(tenant) => match tenant.current_state() {
TenantState::Attaching
| TenantState::Loading
| TenantState::Activating
| TenantState::Broken { .. }
| TenantState::Active => tenant.set_stopping().await,
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
},
None => return Err(TenantStateError::NotFound(tenant_id)),
}
}
@@ -795,6 +686,7 @@ pub async fn immediate_compact(
let timeline = tenant
.get_timeline(timeline_id, true)
.await
.map_err(ApiError::NotFound)?;
// Run in task_mgr to avoid race with tenant_detach operation

View File

@@ -19,8 +19,14 @@ fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result
Ok(())
}
fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
// TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
const PARALLEL_PATH_THRESHOLD: usize = 1;
if paths.len() <= PARALLEL_PATH_THRESHOLD {
for path in paths {
fsync_path(path)?;
}
return Ok(());
}
/// Use at most this number of threads.
/// Increasing this limit will
@@ -30,11 +36,11 @@ fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
let num_threads = paths.len().min(MAX_NUM_THREADS);
let next_path_idx = AtomicUsize::new(0);
std::thread::scope(|s| -> io::Result<()> {
crossbeam_utils::thread::scope(|s| -> io::Result<()> {
let mut handles = vec![];
// Spawn `num_threads - 1`, as the current thread is also a worker.
for _ in 1..num_threads {
handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
handles.push(s.spawn(|_| parallel_worker(paths, &next_path_idx)));
}
parallel_worker(paths, &next_path_idx)?;
@@ -45,41 +51,5 @@ fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
Ok(())
})
}
/// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
if paths.len() == 1 {
fsync_path(&paths[0])?;
return Ok(());
}
fsync_in_thread_pool(paths)
}
/// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current
/// execution thread. Otherwise, we will spawn_blocking and run it in tokio.
pub async fn par_fsync_async(paths: &[PathBuf]) -> io::Result<()> {
const MAX_CONCURRENT_FSYNC: usize = 64;
let mut next = paths.iter().peekable();
let mut js = tokio::task::JoinSet::new();
loop {
while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
let next = next.next().expect("just peeked");
let next = next.to_owned();
js.spawn_blocking(move || fsync_path(&next));
}
// now the joinset has been filled up, wait for next to complete
if let Some(res) = js.join_next().await {
res??;
} else {
// last item had already completed
assert!(
next.peek().is_none(),
"joinset emptied, we shouldn't have more work"
);
return Ok(());
}
}
.unwrap()
}

View File

@@ -1264,7 +1264,12 @@ mod tests {
let harness = TenantHarness::create(test_name)?;
let (tenant, ctx) = runtime.block_on(harness.load());
// create an empty timeline directory
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = runtime.block_on(tenant.create_test_timeline(
TIMELINE_ID,
Lsn(0),
DEFAULT_PG_VERSION,
&ctx,
))?;
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;

View File

@@ -136,7 +136,7 @@ pub(super) async fn gather_inputs(
.context("Failed to refresh gc_info before gathering inputs")?;
// Collect information about all the timelines
let mut timelines = tenant.list_timelines();
let mut timelines = tenant.list_timelines().await;
if timelines.is_empty() {
// perhaps the tenant has just been created, and as such doesn't have any data yet

View File

@@ -37,7 +37,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use remote_layer::RemoteLayerDesc;
pub use remote_layer::RemoteLayer;
use super::layer_map::BatchedUpdates;
@@ -431,6 +431,14 @@ pub trait PersistentLayer: Layer {
/// Permanently remove this layer from disk.
fn delete_resident_layer_file(&self) -> Result<()>;
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
None
}
fn is_remote_layer(&self) -> bool {
false
}
/// Returns None if the layer file size is not known.
///
/// Should not change over the lifetime of the layer object because
@@ -442,6 +450,16 @@ pub trait PersistentLayer: Layer {
fn access_stats(&self) -> &LayerAccessStats;
}
pub fn downcast_remote_layer(
layer: &Arc<dyn PersistentLayer>,
) -> Option<std::sync::Arc<RemoteLayer>> {
if layer.is_remote_layer() {
Arc::clone(layer).downcast_remote_layer()
} else {
None
}
}
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
@@ -524,7 +542,7 @@ impl From<LayerFileName> for LayerDescriptor {
///
/// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
/// global config, and paths to layer files are constructed using the tenant/timeline
/// path from the config. But in the 'pagectl' binary, we need to construct a Layer
/// path from the config. But in the 'pageserver_binutils' binary, we need to construct a Layer
/// struct for a file on disk, without having a page server running, so that we have no
/// config. In that case, we use the Path variant to hold the full path to the file on
/// disk.

View File

@@ -30,7 +30,6 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -58,7 +57,7 @@ use utils::{
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, PathOrConf, RemoteLayerDesc,
LayerKeyIter, PathOrConf,
};
///
@@ -111,7 +110,7 @@ const WILL_INIT: u64 = 1;
/// reading/deserializing records themselves.
///
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub struct BlobRef(pub u64);
struct BlobRef(u64);
impl BlobRef {
pub fn will_init(&self) -> bool {
@@ -620,7 +619,7 @@ impl DeltaLayer {
/// Create a DeltaLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
pub fn new_for_path(path: &Path, file: File) -> Result<Self> {
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);
@@ -664,17 +663,6 @@ impl DeltaLayer {
&self.layer_name(),
)
}
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
}
/// A builder object for constructing a new delta layer.

View File

@@ -26,7 +26,6 @@ use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -54,7 +53,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, RemoteLayerDesc};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
///
/// Header stored in the beginning of the file
@@ -423,7 +422,7 @@ impl ImageLayer {
/// Create an ImageLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
pub fn new_for_path(path: &Path, file: File) -> Result<ImageLayer> {
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);
@@ -465,17 +464,6 @@ impl ImageLayer {
&self.layer_name(),
)
}
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_img(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
}
/// A builder object for constructing a new image layer.

View File

@@ -1,4 +1,4 @@
//! A RemoteLayerDesc is an in-memory placeholder for a layer file that exists
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
//! in remote storage.
//!
use crate::config::PageServerConf;
@@ -25,19 +25,19 @@ use super::{
LayerResidenceStatus, PersistentLayer,
};
/// RemoteLayerDesc is a not yet downloaded [`ImageLayer`] or
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
/// [`crate::storage_layer::DeltaLayer`].
///
/// RemoteLayerDesc might be downloaded on-demand during operations which are
/// RemoteLayer might be downloaded on-demand during operations which are
/// allowed download remote layers and during which, it gets replaced with a
/// concrete `DeltaLayer` or `ImageLayer`.
///
/// See: [`crate::context::RequestContext`] for authorization to download
pub struct RemoteLayerDesc {
pub(crate) tenantid: TenantId,
pub(crate) timelineid: TimelineId,
pub(crate) key_range: Range<Key>,
pub(crate) lsn_range: Range<Lsn>,
pub struct RemoteLayer {
tenantid: TenantId,
timelineid: TimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
pub file_name: LayerFileName,
@@ -54,7 +54,7 @@ pub struct RemoteLayerDesc {
/// Has `LayerMap::replace` failed for this (true) or not (false).
///
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
/// The field is used to mark a RemoteLayerDesc permanently (until restart or ignore+load)
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
/// unprocessable, because a LayerMap::replace failed.
///
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
@@ -63,9 +63,9 @@ pub struct RemoteLayerDesc {
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
}
impl std::fmt::Debug for RemoteLayerDesc {
impl std::fmt::Debug for RemoteLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteLayerDesc")
f.debug_struct("RemoteLayer")
.field("file_name", &self.file_name)
.field("layer_metadata", &self.layer_metadata)
.field("is_incremental", &self.is_incremental)
@@ -73,7 +73,7 @@ impl std::fmt::Debug for RemoteLayerDesc {
}
}
impl Layer for RemoteLayerDesc {
impl Layer for RemoteLayer {
fn get_key_range(&self) -> Range<Key> {
self.key_range.clone()
}
@@ -119,7 +119,7 @@ impl Layer for RemoteLayerDesc {
}
}
impl PersistentLayer for RemoteLayerDesc {
impl PersistentLayer for RemoteLayer {
fn get_tenant_id(&self) -> TenantId {
self.tenantid
}
@@ -160,6 +160,14 @@ impl PersistentLayer for RemoteLayerDesc {
bail!("remote layer has no layer file");
}
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
Some(self)
}
fn is_remote_layer(&self) -> bool {
true
}
fn file_size(&self) -> u64 {
self.layer_metadata.file_size()
}
@@ -193,15 +201,15 @@ impl PersistentLayer for RemoteLayerDesc {
}
}
impl RemoteLayerDesc {
impl RemoteLayer {
pub fn new_img(
tenantid: TenantId,
timelineid: TimelineId,
fname: &ImageFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayerDesc {
RemoteLayerDesc {
) -> RemoteLayer {
RemoteLayer {
tenantid,
timelineid,
key_range: fname.key_range.clone(),
@@ -222,8 +230,8 @@ impl RemoteLayerDesc {
fname: &DeltaFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayerDesc {
RemoteLayerDesc {
) -> RemoteLayer {
RemoteLayer {
tenantid,
timelineid,
key_range: fname.key_range.clone(),

View File

@@ -9,13 +9,13 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::mgr;
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::completion;
use utils::id::TenantId;
pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completion::Barrier>) {
let tenant_id = tenant.tenant_id;
pub fn start_background_loops(tenant_id: TenantId) {
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
@@ -23,16 +23,11 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
None,
&format!("compactor for tenant {tenant_id}"),
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
compaction_loop(tenant)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
}
async move {
compaction_loop(tenant_id)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
},
);
task_mgr::spawn(
@@ -42,16 +37,11 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
None,
&format!("garbage collector for tenant {tenant_id}"),
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
gc_loop(tenant)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
}
async move {
gc_loop(tenant_id)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
},
);
}
@@ -59,7 +49,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
///
/// Compaction task's main loop
///
async fn compaction_loop(tenant: Arc<Tenant>) {
async fn compaction_loop(tenant_id: TenantId) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
@@ -70,16 +60,16 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
loop {
trace!("waking up");
tokio::select! {
let tenant = tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
ControlFlow::Continue(tenant) => tenant,
},
}
};
let period = tenant.get_compaction_period();
@@ -129,7 +119,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
///
/// GC task's main loop
///
async fn gc_loop(tenant: Arc<Tenant>) {
async fn gc_loop(tenant_id: TenantId) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
@@ -137,22 +127,21 @@ async fn gc_loop(tenant: Arc<Tenant>) {
let cancel = task_mgr::shutdown_token();
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx =
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
loop {
trace!("waking up");
tokio::select! {
let tenant = tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
ControlFlow::Continue(tenant) => tenant,
},
}
};
let period = tenant.get_gc_period();
@@ -172,9 +161,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
Duration::from_secs(10)
} else {
// Run gc
let res = tenant
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
.await;
let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await;
if let Err(e) = res {
error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
wait_duration
@@ -200,10 +187,23 @@ async fn gc_loop(tenant: Arc<Tenant>) {
trace!("GC loop stopped.");
}
async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
async fn wait_for_active_tenant(
tenant_id: TenantId,
wait: Duration,
) -> ControlFlow<(), Arc<Tenant>> {
let tenant = loop {
match mgr::get_tenant(tenant_id, false).await {
Ok(tenant) => break tenant,
Err(e) => {
error!("Failed to get a tenant {tenant_id}: {e:#}");
tokio::time::sleep(wait).await;
}
}
};
// if the tenant has a proper status already, no need to wait for anything
if tenant.current_state() == TenantState::Active {
ControlFlow::Continue(())
ControlFlow::Continue(tenant)
} else {
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
loop {
@@ -213,7 +213,7 @@ async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
match new_state {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(());
return ControlFlow::Continue(tenant);
}
state => {
debug!("Not running the task loop, tenant is not active: {state:?}");

View File

@@ -35,7 +35,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
LayerAccessStats, LayerFileName, RemoteLayerDesc,
LayerAccessStats, LayerFileName, RemoteLayer,
};
use crate::tenant::{
ephemeral_file::is_ephemeral_file,
@@ -77,7 +77,6 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::layer_cache::LayerCache;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
@@ -120,7 +119,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(super) layers: RwLock<LayerMap<RemoteLayerDesc>>,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -196,9 +195,8 @@ pub struct Timeline {
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
/// and [`Tenant::delete_timeline`].
pub(super) layer_removal_cs: tokio::sync::Mutex<()>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
@@ -237,13 +235,7 @@ pub struct Timeline {
state: watch::Sender<TimelineState>,
/// Prevent two tasks from deleting the timeline at the same time. If held, the
/// timeline is being deleted. If 'true', the timeline has already been deleted.
pub delete_lock: tokio::sync::Mutex<bool>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
layer_cache: Arc<LayerCache>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -630,7 +622,7 @@ impl Timeline {
{
Ok(()) => Ok(()),
Err(e) => {
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
// walreceiver.status() locks internally, don't count that towards the wait_lsn_time_histo
drop(_timer);
let walreceiver_status = {
match &*self.walreceiver.lock().unwrap() {
@@ -677,7 +669,7 @@ impl Timeline {
}
/// Outermost timeline compaction operation; downloads needed layers.
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> {
const ROUNDS: usize = 2;
let last_record_lsn = self.get_last_record_lsn();
@@ -766,7 +758,7 @@ impl Timeline {
}
/// Compaction which might need to be retried after downloading remote layers.
async fn compact_inner(self: &Arc<Self>, ctx: &RequestContext) -> Result<(), CompactionError> {
async fn compact_inner(&self, ctx: &RequestContext) -> Result<(), CompactionError> {
//
// High level strategy for compaction / image creation:
//
@@ -801,7 +793,7 @@ impl Timeline {
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
let layer_removal_cs = self.layer_removal_cs.lock().await;
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
@@ -835,7 +827,7 @@ impl Timeline {
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
self.compact_level0(&layer_removal_cs, target_file_size, ctx)
.await?;
timer.stop_and_record();
}
@@ -1010,22 +1002,20 @@ impl Timeline {
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(remote_layer_desc) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
if self.layer_cache.contains(&remote_layer_desc.filename()) {
return Ok(Some(false));
}
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
return Ok(Some(false));
}
self.download_remote_layer(remote_layer_desc).await?;
self.download_remote_layer(remote_layer).await?;
Ok(Some(true))
}
/// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let remote_client = self
.remote_client
.as_ref()
@@ -1052,7 +1042,7 @@ impl Timeline {
pub async fn evict_layers(
&self,
_: &GenericRemoteStorage,
layers_to_evict: &[Arc<RemoteLayerDesc>],
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
let remote_client = self.remote_client.clone().expect(
@@ -1087,7 +1077,7 @@ impl Timeline {
async fn evict_layer_batch(
&self,
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Arc<RemoteLayerDesc>],
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
// ensure that the layers have finished uploading
@@ -1136,12 +1126,12 @@ impl Timeline {
fn evict_layer_batch_impl(
&self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
local_layer: &Arc<RemoteLayerDesc>,
batch_updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
local_layer: &Arc<dyn PersistentLayer>,
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<bool> {
use super::layer_map::Replacement;
if !self.layer_cache.contains(&local_layer.filename()) {
if local_layer.is_remote_layer() {
// TODO(issue #3851): consider returning an err here instead of false,
// which is the same out the match later
return Ok(false);
@@ -1168,7 +1158,7 @@ impl Timeline {
let layer_metadata = LayerFileMetadata::new(layer_file_size);
let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayerDesc::new_img(
LayerFileName::Image(image_name) => RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
&image_name,
@@ -1177,7 +1167,7 @@ impl Timeline {
.access_stats()
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
),
LayerFileName::Delta(delta_name) => RemoteLayerDesc::new_delta(
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
&delta_name,
@@ -1188,7 +1178,6 @@ impl Timeline {
),
});
/*
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
Replacement::Replaced { .. } => {
if let Err(e) = local_layer.delete_resident_layer_file() {
@@ -1239,10 +1228,8 @@ impl Timeline {
false
}
};
*/
// Ok(replaced)
Ok(true)
Ok(replaced)
}
}
@@ -1426,9 +1413,6 @@ impl Timeline {
eviction_task_timeline_state: tokio::sync::Mutex::new(
EvictionTaskTimelineState::default(),
),
delete_lock: tokio::sync::Mutex::new(false),
layer_cache: Arc::new(LayerCache::new()),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -1575,12 +1559,9 @@ impl Timeline {
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
);
let remote_desc = layer.layer_desc();
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1612,9 +1593,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
let remote_desc = layer.layer_desc();
self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1659,9 +1638,9 @@ impl Timeline {
async fn create_remote_layers(
&self,
index_part: &IndexPart,
local_layers: HashMap<LayerFileName, Arc<RemoteLayerDesc>>,
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
up_to_date_disk_consistent_lsn: Lsn,
) -> anyhow::Result<HashMap<LayerFileName, Arc<RemoteLayerDesc>>> {
) -> anyhow::Result<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> {
// Are we missing some files that are present in remote storage?
// Create RemoteLayer instances for them.
let mut local_only_layers = local_layers;
@@ -1740,7 +1719,7 @@ impl Timeline {
continue;
}
let remote_layer = RemoteLayerDesc::new_img(
let remote_layer = RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
imgfilename,
@@ -1768,7 +1747,7 @@ impl Timeline {
);
continue;
}
let remote_layer = RemoteLayerDesc::new_delta(
let remote_layer = RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
deltafilename,
@@ -1925,7 +1904,6 @@ impl Timeline {
// no cancellation here, because nothing really waits for this to complete compared
// to spawn_ondemand_logical_size_calculation.
let cancel = CancellationToken::new();
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
.await
@@ -2174,7 +2152,7 @@ impl Timeline {
}
}
fn find_layer_desc(&self, layer_file_name: &str) -> Option<Arc<RemoteLayerDesc>> {
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
@@ -2190,11 +2168,11 @@ impl Timeline {
fn delete_historic_layer(
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<RemoteLayerDesc>,
updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<()> {
if self.layer_cache.contains(&layer.filename()) {
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
let layer_file_size = layer.file_size();
self.metrics
@@ -2443,7 +2421,13 @@ impl Timeline {
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
// If it's a remote layer, download it and retry.
if let Some(layer) = self.layer_cache.get(&layer.filename()) {
if let Some(remote_layer) =
super::storage_layer::downcast_remote_layer(&layer)
{
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
remote_layer // download happens outside the scope of `layers` guard object
} else {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
@@ -2466,10 +2450,6 @@ impl Timeline {
}),
));
continue 'outer;
} else {
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
layer // download happens outside the scope of `layers` guard object
}
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
@@ -2652,7 +2632,7 @@ impl Timeline {
/// Layer flusher task's main loop.
async fn flush_loop(
self: &Arc<Self>,
&self,
mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
ctx: &RequestContext,
) {
@@ -2741,9 +2721,9 @@ impl Timeline {
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
#[instrument(skip(self, frozen_layer, ctx), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
async fn flush_frozen_layer(
self: &Arc<Self>,
&self,
frozen_layer: Arc<InMemoryLayer>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -2763,16 +2743,7 @@ impl Timeline {
.await?
} else {
// normal case, write out a L0 delta layer file.
let this = self.clone();
let frozen_layer = frozen_layer.clone();
let span = tracing::info_span!("blocking");
let (delta_path, metadata) = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.create_delta_layer(&frozen_layer)
})
.await
.context("create_delta_layer spawn_blocking")
.and_then(|res| res)?;
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
HashMap::from([(delta_path, metadata)])
};
@@ -2876,7 +2847,7 @@ impl Timeline {
// Write out the given frozen in-memory layer as a new L0 delta file
fn create_delta_layer(
self: &Arc<Self>,
&self,
frozen_layer: &InMemoryLayer,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
// Write it out
@@ -2892,13 +2863,10 @@ impl Timeline {
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
// files to flush, it might be better to first write them all, and then fsync
// them all in parallel.
// First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace
// this with a single fsync in future refactors.
par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?;
// Then sync the parent directory.
par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
.context("fsync of timeline dir")?;
par_fsync::par_fsync(&[
new_delta_path.clone(),
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
])?;
// Add it to the layer map
let l = Arc::new(new_delta);
@@ -2909,8 +2877,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
batch_updates.insert_historic(Arc::new(l.layer_desc()));
self.layer_cache.insert(l.filename(), l);
batch_updates.insert_historic(l);
batch_updates.flush();
// update the timeline's physical size
@@ -3123,15 +3090,11 @@ impl Timeline {
let all_paths = image_layers
.iter()
.map(|layer| layer.path())
.chain(std::iter::once(
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
))
.collect::<Vec<_>>();
par_fsync::par_fsync_async(&all_paths)
.await
.context("fsync of newly created layer files")?;
par_fsync::par_fsync_async(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
.await
.context("fsync of timeline dir")?;
par_fsync::par_fsync(&all_paths).context("fsync of newly created layer files")?;
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
@@ -3156,9 +3119,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(Arc::new(l.layer_desc()));
let x: Arc<dyn PersistentLayer> = l;
self.layer_cache.insert(x.filename(), x)
updates.insert_historic(l);
}
updates.flush();
drop(layers);
@@ -3171,7 +3132,7 @@ impl Timeline {
#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<DeltaLayer>,
deltas_to_compact: Vec<Arc<RemoteLayerDesc>>,
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
}
/// Top-level failure to compact.
@@ -3181,7 +3142,7 @@ enum CompactionError {
///
/// This should not happen repeatedly, but will be retried once by top-level
/// `Timeline::compact`.
DownloadRequired(Vec<Arc<RemoteLayerDesc>>),
DownloadRequired(Vec<Arc<RemoteLayer>>),
/// Compaction cannot be done right now; page reconstruction and so on.
Other(anyhow::Error),
}
@@ -3198,9 +3159,9 @@ impl Timeline {
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
/// start of level0 files compaction, the on-demand download should be revisited as well.
fn compact_level0_phase1(
async fn compact_level0_phase1(
&self,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
@@ -3253,9 +3214,13 @@ impl Timeline {
let remotes = deltas_to_compact
.iter()
.filter(|l| !self.layer_cache.contains(&l.filename()))
.filter(|l| l.is_remote_layer())
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.cloned()
.map(|l| {
l.clone()
.downcast_remote_layer()
.expect("just checked it is remote layer")
})
.collect::<Vec<_>>();
if !remotes.is_empty() {
@@ -3509,13 +3474,13 @@ impl Timeline {
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
.context("fsync of timeline dir")?;
layer_paths.pop().unwrap();
}
@@ -3532,26 +3497,17 @@ impl Timeline {
/// as Level 1 files.
///
async fn compact_level0(
self: &Arc<Self>,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
&self,
layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
let this = self.clone();
let ctx_inner = ctx.clone();
let layer_removal_cs_inner = layer_removal_cs.clone();
let span = tracing::info_span!("blocking");
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
} = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner)
})
.await
.context("compact_level0_phase1 spawn_blocking")
.map_err(CompactionError::Other)
.and_then(|res| res)?;
} = self
.compact_level0_phase1(layer_removal_cs, target_file_size, ctx)
.await?;
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do
@@ -3595,15 +3551,13 @@ impl Timeline {
.add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let remote_desc = l.layer_desc();
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
x.access_stats().record_residence_event(
&updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(Arc::new(remote_desc));
self.layer_cache.insert(x.filename(), x)
updates.insert_historic(x);
}
// Now that we have reshuffled the data to set of new delta layers, we can
@@ -3611,7 +3565,7 @@ impl Timeline {
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact {
layer_names_to_delete.push(l.filename());
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?;
self.delete_historic_layer(layer_removal_cs, l, &mut updates)?;
}
updates.flush();
drop(layers);
@@ -3731,7 +3685,7 @@ impl Timeline {
fail_point!("before-timeline-gc");
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
let layer_removal_cs = self.layer_removal_cs.lock().await;
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
@@ -3751,7 +3705,7 @@ impl Timeline {
let res = self
.gc_timeline(
layer_removal_cs.clone(),
&layer_removal_cs,
horizon_cutoff,
pitr_cutoff,
retain_lsns,
@@ -3770,7 +3724,7 @@ impl Timeline {
async fn gc_timeline(
&self,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
horizon_cutoff: Lsn,
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,
@@ -3943,11 +3897,7 @@ impl Timeline {
{
for doomed_layer in layers_to_remove {
layer_names_to_delete.push(doomed_layer.filename());
self.delete_historic_layer(
layer_removal_cs.clone(),
doomed_layer,
&mut updates,
)?; // FIXME: schedule succeeded deletions before returning?
self.delete_historic_layer(layer_removal_cs, doomed_layer, &mut updates)?; // FIXME: schedule succeeded deletions before returning?
result.layers_removed += 1;
}
}
@@ -4076,7 +4026,7 @@ impl Timeline {
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
pub async fn download_remote_layer(
&self,
remote_layer: Arc<RemoteLayerDesc>,
remote_layer: Arc<RemoteLayer>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -4133,12 +4083,10 @@ impl Timeline {
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap();
let mut updates = layers.batch_update();
let new_layer =
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
/*
let failure = match updates.replace_historic(&l, new_layer) {
Ok(Replacement::Replaced { .. }) => false,
Ok(Replacement::NotFound) => {
@@ -4193,9 +4141,8 @@ impl Timeline {
remote_layer
.download_replacement_failure
.store(true, Relaxed);
} */
}
}
updates.flush();
drop(layers);
@@ -4208,10 +4155,7 @@ impl Timeline {
remote_layer.ongoing_download.close();
} else {
// Keep semaphore open. We'll drop the permit at the end of the function.
error!(
"layer file download failed: {:?}",
result.as_ref().unwrap_err()
);
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
}
// Don't treat it as an error if the task that triggered the download
@@ -4225,8 +4169,7 @@ impl Timeline {
drop(permit);
Ok(())
}
.in_current_span(),
}.in_current_span(),
);
receiver.await.context("download task cancelled")?
@@ -4299,7 +4242,7 @@ impl Timeline {
let layers = self.layers.read().unwrap();
layers
.iter_historic_layers()
.filter(|l| !self.layer_cache.contains(&l.filename()))
.filter_map(|l| l.downcast_remote_layer())
.map(|l| self.download_remote_layer(l))
.for_each(|dl| downloads.push(dl))
}
@@ -4374,7 +4317,7 @@ pub struct DiskUsageEvictionInfo {
}
pub struct LocalLayerInfoForDiskUsageEviction {
pub layer: Arc<RemoteLayerDesc>,
pub layer: Arc<dyn PersistentLayer>,
pub last_activity_ts: SystemTime,
}
@@ -4408,7 +4351,7 @@ impl Timeline {
let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
if !self.layer_cache.contains(&l.filename()) {
if l.is_remote_layer() {
continue;
}

View File

@@ -29,7 +29,7 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::{PersistentLayer, RemoteLayerDesc},
storage_layer::PersistentLayer,
LogicalSizeCalculationCause, Tenant,
},
};
@@ -184,11 +184,11 @@ impl Timeline {
// NB: all the checks can be invalidated as soon as we release the layer map lock.
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<RemoteLayerDesc>> = {
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().unwrap();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if !self.layer_cache.contains(&hist_layer.filename()) {
if hist_layer.is_remote_layer() {
continue;
}

View File

@@ -1309,6 +1309,7 @@ mod tests {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.await
.expect("Failed to create an empty timeline for dummy wal connection manager");
ConnectionManagerState {

View File

@@ -1209,7 +1209,7 @@ mod tests {
#[tokio::test]
async fn test_relsize() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1428,7 +1428,7 @@ mod tests {
#[tokio::test]
async fn test_drop_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1497,7 +1497,7 @@ mod tests {
#[tokio::test]
async fn test_truncate_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
// Create a 20 MB relation (the size is arbitrary)
@@ -1637,7 +1637,7 @@ mod tests {
#[tokio::test]
async fn test_large_rel() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut lsn = 0x10;

View File

@@ -11,12 +11,10 @@ OBJS = \
pagestore_smgr.o \
relsize_cache.o \
walproposer.o \
walproposer_utils.o \
control_plane_connector.o
walproposer_utils.o
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql

View File

@@ -1,830 +0,0 @@
/*-------------------------------------------------------------------------
*
* control_plane_connector.c
* Captures updates to roles/databases using ProcessUtility_hook and
* sends them to the control ProcessUtility_hook. The changes are sent
* via HTTP to the URL specified by the GUC neon.console_url when the
* transaction commits. Forwarding may be disabled temporarily by
* setting neon.forward_ddl to false.
*
* Currently, the transaction may abort AFTER
* changes have already been forwarded, and that case is not handled.
* Subtransactions are handled using a stack of hash tables, which
* accumulate changes. On subtransaction commit, the top of the stack
* is merged with the table below it.
*
* IDENTIFICATION
* contrib/neon/control_plane_connector.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "access/xact.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "commands/defrem.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "fmgr.h"
#include "utils/guc.h"
#include "port.h"
#include <curl/curl.h>
#include "utils/jsonb.h"
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
/* GUCs */
static char *ConsoleURL = NULL;
static bool ForwardDDL = true;
/* Curl structures for sending the HTTP requests */
static CURL * CurlHandle;
static struct curl_slist *ContentHeader = NULL;
/*
* CURL docs say that this buffer must exist until we call curl_easy_cleanup
* (which we never do), so we make this a static
*/
static char CurlErrorBuf[CURL_ERROR_SIZE];
typedef enum
{
Op_Set, /* An upsert: Either a creation or an alter */
Op_Delete,
} OpType;
typedef struct
{
char name[NAMEDATALEN];
Oid owner;
char old_name[NAMEDATALEN];
OpType type;
} DbEntry;
typedef struct
{
char name[NAMEDATALEN];
char old_name[NAMEDATALEN];
const char *password;
OpType type;
} RoleEntry;
/*
* We keep one of these for each subtransaction in a stack. When a subtransaction
* commits, we merge the top of the stack into the table below it. It is allocated in the
* subtransaction's context.
*/
typedef struct DdlHashTable
{
struct DdlHashTable *prev_table;
HTAB *db_table;
HTAB *role_table;
} DdlHashTable;
static DdlHashTable RootTable;
static DdlHashTable * CurrentDdlTable = &RootTable;
static void
PushKeyValue(JsonbParseState **state, char *key, char *value)
{
JsonbValue k,
v;
k.type = jbvString;
k.val.string.len = strlen(key);
k.val.string.val = key;
v.type = jbvString;
v.val.string.len = strlen(value);
v.val.string.val = value;
pushJsonbValue(state, WJB_KEY, &k);
pushJsonbValue(state, WJB_VALUE, &v);
}
static char *
ConstructDeltaMessage()
{
JsonbParseState *state = NULL;
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
if (RootTable.db_table)
{
JsonbValue dbs;
dbs.type = jbvString;
dbs.val.string.val = "dbs";
dbs.val.string.len = strlen(dbs.val.string.val);
pushJsonbValue(&state, WJB_KEY, &dbs);
pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL);
HASH_SEQ_STATUS status;
DbEntry *entry;
hash_seq_init(&status, RootTable.db_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
PushKeyValue(&state, "op", entry->type == Op_Set ? "set" : "del");
PushKeyValue(&state, "name", entry->name);
if (entry->owner != InvalidOid)
{
PushKeyValue(&state, "owner", GetUserNameFromId(entry->owner, false));
}
if (entry->old_name[0] != '\0')
{
PushKeyValue(&state, "old_name", entry->old_name);
}
pushJsonbValue(&state, WJB_END_OBJECT, NULL);
}
pushJsonbValue(&state, WJB_END_ARRAY, NULL);
}
if (RootTable.role_table)
{
JsonbValue roles;
roles.type = jbvString;
roles.val.string.val = "roles";
roles.val.string.len = strlen(roles.val.string.val);
pushJsonbValue(&state, WJB_KEY, &roles);
pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL);
HASH_SEQ_STATUS status;
RoleEntry *entry;
hash_seq_init(&status, RootTable.role_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
PushKeyValue(&state, "op", entry->type == Op_Set ? "set" : "del");
PushKeyValue(&state, "name", entry->name);
if (entry->password)
{
PushKeyValue(&state, "password", (char *) entry->password);
}
if (entry->old_name[0] != '\0')
{
PushKeyValue(&state, "old_name", entry->old_name);
}
pushJsonbValue(&state, WJB_END_OBJECT, NULL);
}
pushJsonbValue(&state, WJB_END_ARRAY, NULL);
}
JsonbValue *result = pushJsonbValue(&state, WJB_END_OBJECT, NULL);
Jsonb *jsonb = JsonbValueToJsonb(result);
return JsonbToCString(NULL, &jsonb->root, 0 /* estimated_len */ );
}
#define ERROR_SIZE 1024
typedef struct
{
char str[ERROR_SIZE];
size_t size;
} ErrorString;
static size_t
ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
{
/* Docs say size is always 1 */
ErrorString *str = userdata;
size_t to_write = nmemb;
/* +1 for null terminator */
if (str->size + nmemb + 1 >= ERROR_SIZE)
to_write = ERROR_SIZE - str->size - 1;
/* Ignore everyrthing past the first ERROR_SIZE bytes */
if (to_write == 0)
return nmemb;
memcpy(str->str + str->size, ptr, to_write);
str->size += to_write;
str->str[str->size] = '\0';
return nmemb;
}
static void
SendDeltasToControlPlane()
{
if (!RootTable.db_table && !RootTable.role_table)
return;
if (!ConsoleURL)
{
elog(LOG, "ConsoleURL not set, skipping forwarding");
return;
}
if (!ForwardDDL)
return;
char *message = ConstructDeltaMessage();
ErrorString str = {};
curl_easy_setopt(CurlHandle, CURLOPT_CUSTOMREQUEST, "PATCH");
curl_easy_setopt(CurlHandle, CURLOPT_HTTPHEADER, ContentHeader);
curl_easy_setopt(CurlHandle, CURLOPT_POSTFIELDS, message);
curl_easy_setopt(CurlHandle, CURLOPT_URL, ConsoleURL);
curl_easy_setopt(CurlHandle, CURLOPT_ERRORBUFFER, CurlErrorBuf);
curl_easy_setopt(CurlHandle, CURLOPT_TIMEOUT, 3L /* seconds */ );
curl_easy_setopt(CurlHandle, CURLOPT_WRITEDATA, &str);
curl_easy_setopt(CurlHandle, CURLOPT_WRITEFUNCTION, ErrorWriteCallback);
const int num_retries = 5;
int curl_status;
for (int i = 0; i < num_retries; i++)
{
if ((curl_status = curl_easy_perform(CurlHandle)) == 0)
break;
elog(LOG, "Curl request failed on attempt %d: %s", i, CurlErrorBuf);
pg_usleep(1000 * 1000);
}
if (curl_status != 0)
{
elog(ERROR, "Failed to perform curl request: %s", CurlErrorBuf);
}
else
{
long response_code;
if (curl_easy_getinfo(CurlHandle, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_UNKNOWN_OPTION)
{
bool error_exists = str.size != 0;
if (response_code != 200)
{
if (error_exists)
{
elog(ERROR,
"Received HTTP code %ld from control plane: %s",
response_code,
str.str);
}
else
{
elog(ERROR,
"Received HTTP code %ld from control plane",
response_code);
}
}
}
}
}
static void
InitDbTableIfNeeded()
{
if (!CurrentDdlTable->db_table)
{
HASHCTL db_ctl = {};
db_ctl.keysize = NAMEDATALEN;
db_ctl.entrysize = sizeof(DbEntry);
db_ctl.hcxt = CurTransactionContext;
CurrentDdlTable->db_table = hash_create(
"Dbs Created",
4,
&db_ctl,
HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
}
}
static void
InitRoleTableIfNeeded()
{
if (!CurrentDdlTable->role_table)
{
HASHCTL role_ctl = {};
role_ctl.keysize = NAMEDATALEN;
role_ctl.entrysize = sizeof(RoleEntry);
role_ctl.hcxt = CurTransactionContext;
CurrentDdlTable->role_table = hash_create(
"Roles Created",
4,
&role_ctl,
HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
}
}
static void
PushTable()
{
DdlHashTable *new_table = MemoryContextAlloc(CurTransactionContext, sizeof(DdlHashTable));
new_table->prev_table = CurrentDdlTable;
new_table->role_table = NULL;
new_table->db_table = NULL;
CurrentDdlTable = new_table;
}
static void
MergeTable()
{
DdlHashTable *old_table = CurrentDdlTable;
CurrentDdlTable = old_table->prev_table;
if (old_table->db_table)
{
InitDbTableIfNeeded();
DbEntry *entry;
HASH_SEQ_STATUS status;
hash_seq_init(&status, old_table->db_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
DbEntry *to_write = hash_search(
CurrentDdlTable->db_table,
entry->name,
HASH_ENTER,
NULL);
to_write->type = entry->type;
if (entry->owner != InvalidOid)
to_write->owner = entry->owner;
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
if (entry->old_name[0] != '\0')
{
bool found_old = false;
DbEntry *old = hash_search(
CurrentDdlTable->db_table,
entry->old_name,
HASH_FIND,
&found_old);
if (found_old)
{
if (old->old_name[0] != '\0')
strlcpy(to_write->old_name, old->old_name, NAMEDATALEN);
else
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
hash_search(
CurrentDdlTable->db_table,
entry->old_name,
HASH_REMOVE,
NULL);
}
}
}
hash_destroy(old_table->db_table);
}
if (old_table->role_table)
{
InitRoleTableIfNeeded();
RoleEntry *entry;
HASH_SEQ_STATUS status;
hash_seq_init(&status, old_table->role_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
RoleEntry *to_write = hash_search(
CurrentDdlTable->role_table,
entry->name,
HASH_ENTER,
NULL);
to_write->type = entry->type;
if (entry->password)
to_write->password = entry->password;
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
if (entry->old_name[0] != '\0')
{
bool found_old = false;
RoleEntry *old = hash_search(
CurrentDdlTable->role_table,
entry->old_name,
HASH_FIND,
&found_old);
if (found_old)
{
if (old->old_name[0] != '\0')
strlcpy(to_write->old_name, old->old_name, NAMEDATALEN);
else
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
hash_search(CurrentDdlTable->role_table,
entry->old_name,
HASH_REMOVE,
NULL);
}
}
}
hash_destroy(old_table->role_table);
}
}
static void
PopTable()
{
/*
* Current table gets freed because it is allocated in aborted
* subtransaction's memory context.
*/
CurrentDdlTable = CurrentDdlTable->prev_table;
}
static void
NeonSubXactCallback(
SubXactEvent event,
SubTransactionId mySubid,
SubTransactionId parentSubid,
void *arg)
{
switch (event)
{
case SUBXACT_EVENT_START_SUB:
return PushTable();
case SUBXACT_EVENT_COMMIT_SUB:
return MergeTable();
case SUBXACT_EVENT_ABORT_SUB:
return PopTable();
default:
return;
}
}
static void
NeonXactCallback(XactEvent event, void *arg)
{
if (event == XACT_EVENT_PRE_COMMIT || event == XACT_EVENT_PARALLEL_PRE_COMMIT)
{
SendDeltasToControlPlane();
}
RootTable.role_table = NULL;
RootTable.db_table = NULL;
Assert(CurrentDdlTable == &RootTable);
}
static void
HandleCreateDb(CreatedbStmt *stmt)
{
InitDbTableIfNeeded();
DefElem *downer = NULL;
ListCell *option;
foreach(option, stmt->options)
{
DefElem *defel = lfirst(option);
if (strcmp(defel->defname, "owner") == 0)
downer = defel;
}
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
stmt->dbname,
HASH_ENTER,
&found);
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
entry->type = Op_Set;
if (downer && downer->arg)
entry->owner = get_role_oid(defGetString(downer), false);
else
entry->owner = GetUserId();
}
static void
HandleAlterOwner(AlterOwnerStmt *stmt)
{
if (stmt->objectType != OBJECT_DATABASE)
return;
InitDbTableIfNeeded();
const char *name = strVal(stmt->object);
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
name,
HASH_ENTER,
&found);
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
entry->owner = get_role_oid(get_rolespec_name(stmt->newowner), false);
entry->type = Op_Set;
}
static void
HandleDbRename(RenameStmt *stmt)
{
Assert(stmt->renameType == OBJECT_DATABASE);
InitDbTableIfNeeded();
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
stmt->subname,
HASH_FIND,
&found);
DbEntry *entry_for_new_name = hash_search(
CurrentDdlTable->db_table,
stmt->newname,
HASH_ENTER,
NULL);
entry_for_new_name->type = Op_Set;
if (found)
{
if (entry->old_name[0] != '\0')
strlcpy(entry_for_new_name->old_name, entry->old_name, NAMEDATALEN);
else
strlcpy(entry_for_new_name->old_name, entry->name, NAMEDATALEN);
entry_for_new_name->owner = entry->owner;
hash_search(
CurrentDdlTable->db_table,
stmt->subname,
HASH_REMOVE,
NULL);
}
else
{
strlcpy(entry_for_new_name->old_name, stmt->subname, NAMEDATALEN);
entry_for_new_name->owner = InvalidOid;
}
}
static void
HandleDropDb(DropdbStmt *stmt)
{
InitDbTableIfNeeded();
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
stmt->dbname,
HASH_ENTER,
&found);
entry->type = Op_Delete;
entry->owner = InvalidOid;
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
}
static void
HandleCreateRole(CreateRoleStmt *stmt)
{
InitRoleTableIfNeeded();
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
stmt->role,
HASH_ENTER,
&found);
DefElem *dpass = NULL;
ListCell *option;
foreach(option, stmt->options)
{
DefElem *defel = lfirst(option);
if (strcmp(defel->defname, "password") == 0)
dpass = defel;
}
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
if (dpass && dpass->arg)
entry->password = MemoryContextStrdup(CurTransactionContext, strVal(dpass->arg));
else
entry->password = NULL;
entry->type = Op_Set;
}
static void
HandleAlterRole(AlterRoleStmt *stmt)
{
InitRoleTableIfNeeded();
DefElem *dpass = NULL;
ListCell *option;
foreach(option, stmt->options)
{
DefElem *defel = lfirst(option);
if (strcmp(defel->defname, "password") == 0)
dpass = defel;
}
/* We only care about updates to the password */
if (!dpass)
return;
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
stmt->role->rolename,
HASH_ENTER,
&found);
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
if (dpass->arg)
entry->password = MemoryContextStrdup(CurTransactionContext, strVal(dpass->arg));
else
entry->password = NULL;
entry->type = Op_Set;
}
static void
HandleRoleRename(RenameStmt *stmt)
{
InitRoleTableIfNeeded();
Assert(stmt->renameType == OBJECT_ROLE);
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
stmt->subname,
HASH_FIND,
&found);
RoleEntry *entry_for_new_name = hash_search(
CurrentDdlTable->role_table,
stmt->newname,
HASH_ENTER,
NULL);
entry_for_new_name->type = Op_Set;
if (found)
{
if (entry->old_name[0] != '\0')
strlcpy(entry_for_new_name->old_name, entry->old_name, NAMEDATALEN);
else
strlcpy(entry_for_new_name->old_name, entry->name, NAMEDATALEN);
entry_for_new_name->password = entry->password;
hash_search(
CurrentDdlTable->role_table,
entry->name,
HASH_REMOVE,
NULL);
}
else
{
strlcpy(entry_for_new_name->old_name, stmt->subname, NAMEDATALEN);
entry_for_new_name->password = NULL;
}
}
static void
HandleDropRole(DropRoleStmt *stmt)
{
InitRoleTableIfNeeded();
ListCell *item;
foreach(item, stmt->roles)
{
RoleSpec *spec = lfirst(item);
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
spec->rolename,
HASH_ENTER,
&found);
entry->type = Op_Delete;
entry->password = NULL;
if (!found)
memset(entry->old_name, 0, sizeof(entry));
}
}
static void
HandleRename(RenameStmt *stmt)
{
if (stmt->renameType == OBJECT_DATABASE)
return HandleDbRename(stmt);
else if (stmt->renameType == OBJECT_ROLE)
return HandleRoleRename(stmt);
}
static void
NeonProcessUtility(
PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc)
{
Node *parseTree = pstmt->utilityStmt;
switch (nodeTag(parseTree))
{
case T_CreatedbStmt:
HandleCreateDb(castNode(CreatedbStmt, parseTree));
break;
case T_AlterOwnerStmt:
HandleAlterOwner(castNode(AlterOwnerStmt, parseTree));
break;
case T_RenameStmt:
HandleRename(castNode(RenameStmt, parseTree));
break;
case T_DropdbStmt:
HandleDropDb(castNode(DropdbStmt, parseTree));
break;
case T_CreateRoleStmt:
HandleCreateRole(castNode(CreateRoleStmt, parseTree));
break;
case T_AlterRoleStmt:
HandleAlterRole(castNode(AlterRoleStmt, parseTree));
break;
case T_DropRoleStmt:
HandleDropRole(castNode(DropRoleStmt, parseTree));
break;
default:
break;
}
if (PreviousProcessUtilityHook)
{
PreviousProcessUtilityHook(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
else
{
standard_ProcessUtility(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
}
extern void
InitControlPlaneConnector()
{
PreviousProcessUtilityHook = ProcessUtility_hook;
ProcessUtility_hook = NeonProcessUtility;
RegisterXactCallback(NeonXactCallback, NULL);
RegisterSubXactCallback(NeonSubXactCallback, NULL);
DefineCustomStringVariable(
"neon.console_url",
"URL of the Neon Console, which will be forwarded changes to dbs and roles",
NULL,
&ConsoleURL,
NULL,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomBoolVariable(
"neon.forward_ddl",
"Controls whether to forward DDL to the control plane",
NULL,
&ForwardDDL,
true,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
const char *jwt_token = getenv("NEON_CONTROL_PLANE_TOKEN");
if (!jwt_token)
{
elog(LOG, "Missing NEON_CONTROL_PLANE_TOKEN environment variable, forwarding will not be authenticated");
}
if (curl_global_init(CURL_GLOBAL_DEFAULT))
{
elog(ERROR, "Failed to initialize curl");
}
if ((CurlHandle = curl_easy_init()) == NULL)
{
elog(ERROR, "Failed to initialize curl handle");
}
if ((ContentHeader = curl_slist_append(ContentHeader, "Content-Type: application/json")) == NULL)
{
elog(ERROR, "Failed to initialize content header");
}
if (jwt_token)
{
char auth_header[8192];
snprintf(auth_header, sizeof(auth_header), "Authorization: Bearer %s", jwt_token);
if ((ContentHeader = curl_slist_append(ContentHeader, auth_header)) == NULL)
{
elog(ERROR, "Failed to initialize authorization header");
}
}
}

View File

@@ -1,6 +0,0 @@
#ifndef CONTROL_PLANE_CONNECTOR_H
#define CONTROL_PLANE_CONNECTOR_H
void InitControlPlaneConnector();
#endif

View File

@@ -25,7 +25,6 @@
#include "neon.h"
#include "walproposer.h"
#include "pagestore_client.h"
#include "control_plane_connector.h"
PG_MODULE_MAGIC;
void _PG_init(void);
@@ -35,11 +34,7 @@ _PG_init(void)
{
pg_init_libpagestore();
pg_init_walproposer();
InitControlPlaneConnector();
// Important: This must happen after other parts of the extension
// are loaded, otherwise any settings to GUCs that were set before
// the extension was loaded will be removed.
EmitWarningsOnPlaceholders("neon");
}

View File

@@ -19,10 +19,8 @@ use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use crate::metrics::BROKER_ITERATION_TIMELINES;
use crate::metrics::BROKER_PULLED_UPDATES;
use crate::metrics::BROKER_PUSHED_UPDATES;
use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -63,14 +61,8 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
BROKER_PUSHED_UPDATES.inc();
}
let elapsed = now.elapsed();
BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
BROKER_ITERATION_TIMELINES.observe(active_tlis.len() as f64);
if elapsed > push_interval / 2 {
info!("broker push is too long, pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
}
// Log duration every second. Should be about 10MB of logs per day.
info!("pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
sleep(push_interval).await;
}
};

View File

@@ -125,25 +125,6 @@ pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_backup_errors_total counter")
});
pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_push_update_seconds",
"Seconds to push all timeline updates to the broker",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
});
pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
];
pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_iteration_timelines",
"Count of timelines pushed to the broker in a single iteration",
TIMELINES_COUNT_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
});
pub const LABEL_UNKNOWN: &str = "unknown";

View File

@@ -156,9 +156,7 @@ class LLVM:
profdata: Path,
objects: List[str],
sources: List[str],
demangler: Optional[Path] = None,
output_file: Optional[Path] = None,
) -> None:
demangler: Optional[Path] = None) -> None:
cwd = self.cargo.cwd
objects = list(intersperse('-object', objects))
@@ -182,18 +180,14 @@ class LLVM:
*objects,
*sources,
]
if output_file is not None:
with output_file.open('w') as outfile:
subprocess.check_call(cmd, cwd=cwd, stdout=outfile)
else:
subprocess.check_call(cmd, cwd=cwd)
subprocess.check_call(cmd, cwd=cwd)
def cov_report(self, **kwargs) -> None:
self._cov(subcommand='report', **kwargs)
def cov_export(self, *, kind: str, output_file: Optional[Path], **kwargs) -> None:
def cov_export(self, *, kind: str, **kwargs) -> None:
extras = (f'-format={kind}', )
self._cov(subcommand='export', *extras, output_file=output_file, **kwargs)
self._cov(subcommand='export', *extras, **kwargs)
def cov_show(self, *, kind: str, output_dir: Optional[Path] = None, **kwargs) -> None:
extras = [f'-format={kind}']
@@ -289,12 +283,9 @@ class TextReport(Report):
self.llvm.cov_show(kind='text', **self._common_kwargs())
@dataclass
class LcovReport(Report):
output_file: Path
def generate(self) -> None:
self.llvm.cov_export(kind='lcov', output_file=self.output_file, **self._common_kwargs())
self.llvm.cov_export(kind='lcov', **self._common_kwargs())
@dataclass
@@ -484,7 +475,7 @@ class State:
'text':
lambda: TextReport(**params),
'lcov':
lambda: LcovReport(**params, output_file=self.report_dir / 'lcov.info'),
lambda: LcovReport(**params),
'summary':
lambda: SummaryReport(**params),
'github':

View File

@@ -535,8 +535,8 @@ def export_timeline(
def main(args: argparse.Namespace):
# any psql version will do here. use current DEFAULT_PG_VERSION = 15
psql_path = str(Path(args.pg_distrib_dir) / "v15" / "bin" / "psql")
# any psql version will do here. use current DEFAULT_PG_VERSION = 14
psql_path = str(Path(args.pg_distrib_dir) / "v14" / "bin" / "psql")
old_pageserver_host = args.old_pageserver_host
new_pageserver_host = args.new_pageserver_host

View File

@@ -1,5 +1,5 @@
//
// The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch.
// The script parses Allure reports and posts a comment with a summary of the test results to the PR.
//
// The comment is updated on each run with the latest results.
//
@@ -7,7 +7,7 @@
// - uses: actions/github-script@v6
// with:
// script: |
// const script = require("./scripts/comment-test-report.js")
// const script = require("./scripts/pr-comment-test-report.js")
// await script({
// github,
// context,
@@ -35,12 +35,8 @@ class DefaultMap extends Map {
module.exports = async ({ github, context, fetch, report }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// If we run the script in the PR or in the branch (main/release/...)
const isPullRequest = !!context.payload.pull_request
// Latest commit in PR or in the branch
const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha
// Let users know that the comment is updated automatically
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${commitSha} at ${new Date().toISOString()} :recycle:</sub></div>`
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${context.payload.pull_request.head.sha} at ${new Date().toISOString()} :recycle:</sub></div>`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// Commend body itself
@@ -170,39 +166,22 @@ module.exports = async ({ github, context, fetch, report }) => {
commentBody += autoupdateNotice
let createCommentFn, listCommentsFn, updateCommentFn, issueNumberOrSha
if (isPullRequest) {
createCommentFn = github.rest.issues.createComment
listCommentsFn = github.rest.issues.listComments
updateCommentFn = github.rest.issues.updateComment
issueNumberOrSha = {
issue_number: context.payload.number,
}
} else {
updateCommentFn = github.rest.repos.updateCommitComment
listCommentsFn = github.rest.repos.listCommentsForCommit
createCommentFn = github.rest.repos.createCommitComment
issueNumberOrSha = {
commit_sha: commitSha,
}
}
const { data: comments } = await listCommentsFn({
...issueNumberOrSha,
const { data: comments } = await github.rest.issues.listComments({
issue_number: context.payload.number,
...ownerRepoParams,
})
const comment = comments.find(comment => comment.user.id === githubActionsBotId && comment.body.startsWith(startMarker))
if (comment) {
await updateCommentFn({
await github.rest.issues.updateComment({
comment_id: comment.id,
body: commentBody,
...ownerRepoParams,
})
} else {
await createCommentFn({
await github.rest.issues.createComment({
issue_number: context.payload.number,
body: commentBody,
...issueNumberOrSha,
...ownerRepoParams,
})
}

View File

@@ -40,9 +40,6 @@ pub type BrokerClientChannel = BrokerServiceClient<Channel>;
// Create connection object configured to run TLS if schema starts with https://
// and plain text otherwise. Connection is lazy, only endpoint sanity is
// validated here.
//
// NB: this function is not async, but still must be run on a tokio runtime thread
// because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
where
U: std::convert::TryInto<Uri>,

View File

@@ -1603,6 +1603,8 @@ class NeonPageserver(PgProtocol):
# https://github.com/neondatabase/neon/issues/2442
".*could not remove ephemeral file.*No such file or directory.*",
# FIXME: These need investigation
".*gc_loop.*Failed to get a tenant .* Tenant .* not found.*",
".*compaction_loop.*Failed to get a tenant .* Tenant .* not found.*",
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
".*Removing intermediate uninit mark file.*",
@@ -1619,10 +1621,6 @@ class NeonPageserver(PgProtocol):
".*task iteration took longer than the configured period.*",
# this is until #3501
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed, retrying in .*: queue is in state Stopped.*",
]
def start(

View File

@@ -155,14 +155,14 @@ class PageserverHttpClient(requests.Session):
return res_json
def tenant_create(
self, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None
self, new_tenant_id: Optional[TenantId] = None, conf: Optional[Dict[str, Any]] = None
) -> TenantId:
if conf is not None:
assert "new_tenant_id" not in conf.keys()
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json={
"new_tenant_id": str(new_tenant_id),
"new_tenant_id": str(new_tenant_id) if new_tenant_id else None,
**(conf or {}),
},
)
@@ -293,13 +293,13 @@ class PageserverHttpClient(requests.Session):
self,
pg_version: PgVersion,
tenant_id: TenantId,
new_timeline_id: TimelineId,
new_timeline_id: Optional[TimelineId] = None,
ancestor_timeline_id: Optional[TimelineId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
**kwargs,
) -> Dict[Any, Any]:
body: Dict[str, Any] = {
"new_timeline_id": str(new_timeline_id),
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
}

View File

@@ -27,10 +27,6 @@ class PgVersion(str, enum.Enum):
def __repr__(self) -> str:
return f"'{self.value}'"
# Make this explicit for Python 3.11 compatibility, which changes the behavior of enums
def __str__(self) -> str:
return self.value
# In GitHub workflows we use Postgres version with v-prefix (e.g. v14 instead of just 14),
# sometime we need to do so in tests.
@property
@@ -82,11 +78,11 @@ def pytest_addoption(parser: Parser):
@pytest.fixture(scope="session")
def pg_version(request: FixtureRequest) -> Iterator[PgVersion]:
if v := request.config.getoption("--pg-version"):
version, source = v, "from --pg-version command-line argument"
version, source = v, "from --pg-version commad-line argument"
elif v := os.environ.get("DEFAULT_PG_VERSION"):
version, source = PgVersion(v), "from DEFAULT_PG_VERSION environment variable"
else:
version, source = DEFAULT_VERSION, "default version"
version, source = DEFAULT_VERSION, "default verson"
log.info(f"pg_version is {version} ({source})")
yield version

View File

@@ -3,7 +3,7 @@ from contextlib import closing
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol
from fixtures.pageserver.http import PageserverApiException
from fixtures.types import TenantId, TimelineId
from fixtures.types import TenantId
def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
@@ -25,19 +25,21 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
ps.safe_psql("set FOO", password=tenant_token)
ps.safe_psql("set FOO", password=pageserver_token)
new_timeline_id = env.neon_cli.create_branch(
"test_pageserver_auth", tenant_id=env.initial_tenant
)
# tenant can create branches
tenant_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
ancestor_timeline_id=new_timeline_id,
)
# console can create branches for tenant
pageserver_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
ancestor_timeline_id=new_timeline_id,
)
# fail to create branch using token with different tenant_id
@@ -47,19 +49,18 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
invalid_tenant_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
ancestor_timeline_id=new_timeline_id,
)
# create tenant using management token
pageserver_http_client.tenant_create(TenantId.generate())
pageserver_http_client.tenant_create()
# fail to create tenant using tenant token
with pytest.raises(
PageserverApiException,
match="Forbidden: Attempt to access management api with tenant scope. Permission denied",
):
tenant_http_client.tenant_create(TenantId.generate())
tenant_http_client.tenant_create()
def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):

View File

@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*could not load tenant.*load local timeline.*",
]
)

View File

@@ -1,210 +0,0 @@
from types import TracebackType
from typing import Any, Dict, List, Optional, Tuple, Type
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import VanillaPostgres
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def handle_db(dbs, roles, operation):
if operation["op"] == "set":
if "old_name" in operation and operation["old_name"] in dbs:
dbs[operation["name"]] = dbs[operation["old_name"]]
dbs.pop(operation["old_name"])
if "owner" in operation:
dbs[operation["name"]] = operation["owner"]
elif operation["op"] == "del":
dbs.pop(operation["name"])
else:
raise ValueError("Invalid op")
def handle_role(dbs, roles, operation):
if operation["op"] == "set":
if "old_name" in operation and operation["old_name"] in roles:
roles[operation["name"]] = roles[operation["old_name"]]
roles.pop(operation["old_name"])
for db, owner in dbs.items():
if owner == operation["old_name"]:
dbs[db] = operation["name"]
if "password" in operation:
roles[operation["name"]] = operation["password"]
elif operation["op"] == "del":
if "old_name" in operation:
roles.pop(operation["old_name"])
roles.pop(operation["name"])
else:
raise ValueError("Invalid op")
fail = False
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
log.info(f"Received request with data {request.get_data(as_text=True)}")
if fail:
log.info("FAILING")
return Response(status=500, response="Failed just cuz")
if request.json is None:
log.info("Received invalid JSON")
return Response(status=400)
json = request.json
# Handle roles first
if "roles" in json:
for operation in json["roles"]:
handle_role(dbs, roles, operation)
if "dbs" in json:
for operation in json["dbs"]:
handle_db(dbs, roles, operation)
return Response(status=200)
class DdlForwardingContext:
def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: str, port: int):
self.server = httpserver
self.pg = vanilla_pg
self.host = host
self.port = port
self.dbs: Dict[str, str] = {}
self.roles: Dict[str, str] = {}
endpoint = "/management/api/v2/roles_and_databases"
ddl_url = f"http://{host}:{port}{endpoint}"
self.pg.configure(
[
f"neon.console_url={ddl_url}",
"shared_preload_libraries = 'neon'",
]
)
log.info(f"Listening on {ddl_url}")
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
)
def __enter__(self):
self.pg.start()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
):
self.pg.stop()
def send(self, query: str) -> List[Tuple[Any, ...]]:
return self.pg.safe_psql(query)
def wait(self, timeout=3):
self.server.wait(timeout=timeout)
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
res = self.send(query)
self.wait(timeout=timeout)
return res
@pytest.fixture(scope="function")
def ddl(
httpserver: HTTPServer, vanilla_pg: VanillaPostgres, httpserver_listen_address: tuple[str, int]
):
(host, port) = httpserver_listen_address
with DdlForwardingContext(httpserver, vanilla_pg, host, port) as ddl:
yield ddl
def test_ddl_forwarding(ddl: DdlForwardingContext):
curr_user = ddl.send("SELECT current_user")[0][0]
log.info(f"Current user is {curr_user}")
ddl.send_and_wait("CREATE DATABASE bork")
assert ddl.dbs == {"bork": curr_user}
ddl.send_and_wait("CREATE ROLE volk WITH PASSWORD 'nu_zayats'")
ddl.send_and_wait("ALTER DATABASE bork RENAME TO nu_pogodi")
assert ddl.dbs == {"nu_pogodi": curr_user}
ddl.send_and_wait("ALTER DATABASE nu_pogodi OWNER TO volk")
assert ddl.dbs == {"nu_pogodi": "volk"}
ddl.send_and_wait("DROP DATABASE nu_pogodi")
assert ddl.dbs == {}
ddl.send_and_wait("DROP ROLE volk")
assert ddl.roles == {}
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
assert ddl.roles == {"tarzan": "of_the_apes"}
ddl.send_and_wait("DROP ROLE tarzan")
assert ddl.roles == {}
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
assert ddl.roles == {"tarzan": "of_the_apes"}
ddl.send_and_wait("ALTER ROLE tarzan WITH PASSWORD 'jungle_man'")
assert ddl.roles == {"tarzan": "jungle_man"}
ddl.send_and_wait("ALTER ROLE tarzan RENAME TO mowgli")
assert ddl.roles == {"mowgli": "jungle_man"}
ddl.send_and_wait("DROP ROLE mowgli")
assert ddl.roles == {}
conn = ddl.pg.connect()
cur = conn.cursor()
cur.execute("BEGIN")
cur.execute("CREATE ROLE bork WITH PASSWORD 'cork'")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"bork": "cork"}
cur.execute("BEGIN")
cur.execute("CREATE ROLE stork WITH PASSWORD 'pork'")
cur.execute("ABORT")
ddl.wait()
assert ("stork", "pork") not in ddl.roles.items()
cur.execute("BEGIN")
cur.execute("ALTER ROLE bork WITH PASSWORD 'pork'")
cur.execute("ALTER ROLE bork RENAME TO stork")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"stork": "pork"}
cur.execute("BEGIN")
cur.execute("CREATE ROLE dork WITH PASSWORD 'york'")
cur.execute("SAVEPOINT point")
cur.execute("ALTER ROLE dork WITH PASSWORD 'zork'")
cur.execute("ALTER ROLE dork RENAME TO fork")
cur.execute("ROLLBACK TO SAVEPOINT point")
cur.execute("ALTER ROLE dork WITH PASSWORD 'fork'")
cur.execute("ALTER ROLE dork RENAME TO zork")
cur.execute("RELEASE SAVEPOINT point")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"stork": "pork", "zork": "fork"}
cur.execute("DROP ROLE stork")
cur.execute("DROP ROLE zork")
ddl.wait()
assert ddl.roles == {}
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
cur.execute("CREATE ROLE stork WITH PASSWORD 'cork'")
cur.execute("BEGIN")
cur.execute("DROP ROLE bork")
cur.execute("ALTER ROLE stork RENAME TO bork")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"bork": "cork"}
cur.execute("DROP ROLE bork")
ddl.wait()
assert ddl.roles == {}
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
cur.execute("CREATE DATABASE stork WITH OWNER=bork")
cur.execute("ALTER ROLE bork RENAME TO cork")
ddl.wait()
assert ddl.dbs == {"stork": "cork"}
with pytest.raises(psycopg2.InternalError):
global fail
fail = True
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
ddl.wait()
conn.close()

View File

@@ -228,6 +228,7 @@ def proxy_with_metric_collector(
@pytest.mark.asyncio
async def test_proxy_metric_collection(
httpserver: HTTPServer,
httpserver_listen_address,
proxy_with_metric_collector: NeonProxy,
vanilla_pg: VanillaPostgres,
):

View File

@@ -140,7 +140,7 @@ def test_remote_storage_backup_and_restore(
# This is before the failures injected by test_remote_failures, so it's a permanent error.
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
env.pageserver.allowed_errors.append(
".*attach failed.*: storage-sync-list-remote-timelines",
".*error attaching tenant: storage-sync-list-remote-timelines",
)
# Attach it. This HTTP request will succeed and launch a
# background task to load the tenant. In that background task,

View File

@@ -647,9 +647,7 @@ def test_ignored_tenant_stays_broken_without_metadata(
metadata_removed = True
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
env.pageserver.allowed_errors.append(
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
)
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
pageserver_http.tenant_load(tenant_id=tenant_id)

View File

@@ -22,7 +22,6 @@ from fixtures.neon_fixtures import (
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -309,26 +308,27 @@ def test_pageserver_with_empty_tenants(
env.pageserver.allowed_errors.append(
".*marking .* as locally complete, while it doesnt exist in remote index.*"
)
env.pageserver.allowed_errors.append(".*load failed.*list timelines directory.*")
env.pageserver.allowed_errors.append(
".*could not load tenant.*Failed to list timelines directory.*"
)
client = env.pageserver.http_client()
tenant_with_empty_timelines = TenantId.generate()
client.tenant_create(tenant_with_empty_timelines)
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
tenant_with_empty_timelines_dir = client.tenant_create()
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines) / "timelines"
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
)
)
assert (
files_in_timelines_dir == 0
), f"Tenant {tenant_with_empty_timelines} should have an empty timelines/ directory"
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
# Trigger timeline re-initialization after pageserver restart
env.endpoints.stop_all()
@@ -340,16 +340,10 @@ def test_pageserver_with_empty_tenants(
env.pageserver.start()
client = env.pageserver.http_client()
def not_loading():
tenants = client.tenant_list()
assert len(tenants) == 2
assert all(t["state"]["slug"] != "Loading" for t in tenants)
wait_until(10, 0.2, not_loading)
tenants = client.tenant_list()
assert len(tenants) == 2
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
assert (
broken_tenant["state"]["slug"] == "Broken"
@@ -360,17 +354,17 @@ def test_pageserver_with_empty_tenants(
broken_tenant_status["state"]["slug"] == "Broken"
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines_dir)]
assert (
loaded_tenant["state"]["slug"] == "Active"
), "Tenant {tenant_with_empty_timelines} with empty timelines dir should be active and ready for timeline creation"
), "Tenant {tenant_with_empty_timelines_dir} with empty timelines dir should be active and ready for timeline creation"
loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines)
loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines_dir)
assert (
loaded_tenant_status["state"]["slug"] == "Active"
), f"Tenant {tenant_with_empty_timelines} without timelines dir should be active"
), f"Tenant {tenant_with_empty_timelines_dir} without timelines dir should be active"
time.sleep(1) # to allow metrics propagation
@@ -380,7 +374,7 @@ def test_pageserver_with_empty_tenants(
"state": "Broken",
}
active_tenants_metric_filter = {
"tenant_id": str(tenant_with_empty_timelines),
"tenant_id": str(tenant_with_empty_timelines_dir),
"state": "Active",
}
@@ -392,7 +386,7 @@ def test_pageserver_with_empty_tenants(
assert (
tenant_active_count == 1
), f"Tenant {tenant_with_empty_timelines} should have metric as active"
), f"Tenant {tenant_with_empty_timelines_dir} should have metric as active"
tenant_broken_count = int(
ps_metrics.query_one(

View File

@@ -371,7 +371,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
# make the second call and assert behavior
log.info("second call start")
error_msg_re = "timeline deletion is already in progress"
error_msg_re = "another task is already setting the deleted_flag, started at"
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
assert second_call_err.value.status_code == 500

View File

@@ -27,6 +27,7 @@ futures-core = { version = "0.3" }
futures-executor = { version = "0.3" }
futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
hashbrown = { version = "0.12", features = ["raw"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -38,7 +39,7 @@ num-traits = { version = "0.2", features = ["i128"] }
prost = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
regex-syntax = { version = "0.7" }
regex-syntax = { version = "0.6" }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "multipart", "rustls-tls"] }
ring = { version = "0.16", features = ["std"] }
rustls = { version = "0.20", features = ["dangerous_configuration"] }
@@ -61,6 +62,7 @@ url = { version = "2", features = ["serde"] }
anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
either = { version = "1" }
hashbrown = { version = "0.12", features = ["raw"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -68,7 +70,7 @@ memchr = { version = "2" }
nom = { version = "7" }
prost = { version = "0.11" }
regex = { version = "1" }
regex-syntax = { version = "0.7" }
regex-syntax = { version = "0.6" }
serde = { version = "1", features = ["alloc", "derive"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit-mut"] }