mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 03:50:37 +00:00
Compare commits
26 Commits
skyzh/refa
...
problame/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1ebe92bcf9 | ||
|
|
413598b19b | ||
|
|
b345f32e3f | ||
|
|
69cfa9fe61 | ||
|
|
2c424c8f4e | ||
|
|
4001f441c0 | ||
|
|
ef956c47fc | ||
|
|
8606b6abe5 | ||
|
|
732f60317b | ||
|
|
b54431bbd3 | ||
|
|
def5eb8542 | ||
|
|
07da786ed3 | ||
|
|
75c3c43b2e | ||
|
|
bdf03eab58 | ||
|
|
32c85fa87a | ||
|
|
b2e0c58a8c | ||
|
|
94f30f0660 | ||
|
|
a55d224923 | ||
|
|
4f586ac101 | ||
|
|
feb2e80b83 | ||
|
|
ee22e81583 | ||
|
|
3e604eaa39 | ||
|
|
8bcb542a3b | ||
|
|
17b081d294 | ||
|
|
d5337e6a65 | ||
|
|
cc96a5186d |
10
.github/actions/run-python-test-set/action.yml
vendored
10
.github/actions/run-python-test-set/action.yml
vendored
@@ -36,6 +36,14 @@ inputs:
|
||||
description: 'Region name for real s3 tests'
|
||||
required: false
|
||||
default: ''
|
||||
real_s3_access_key_id:
|
||||
description: 'Access key id'
|
||||
required: false
|
||||
default: ''
|
||||
real_s3_secret_access_key:
|
||||
description: 'Secret access key'
|
||||
required: false
|
||||
default: ''
|
||||
rerun_flaky:
|
||||
description: 'Whether to rerun flaky tests'
|
||||
required: false
|
||||
@@ -96,6 +104,8 @@ runs:
|
||||
COMPATIBILITY_POSTGRES_DISTRIB_DIR: /tmp/neon-previous/pg_install
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: ${{ inputs.build_type }}
|
||||
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
|
||||
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg${{ inputs.pg_version }}
|
||||
ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage')
|
||||
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')
|
||||
|
||||
30
.github/workflows/build_and_test.yml
vendored
30
.github/workflows/build_and_test.yml
vendored
@@ -346,8 +346,10 @@ jobs:
|
||||
test_selection: regress
|
||||
needs_postgres_source: true
|
||||
run_with_real_s3: true
|
||||
real_s3_bucket: neon-github-ci-tests
|
||||
real_s3_region: eu-central-1
|
||||
real_s3_bucket: ci-tests-s3
|
||||
real_s3_region: us-west-2
|
||||
real_s3_access_key_id: "${{ secrets.AWS_ACCESS_KEY_ID_CI_TESTS_S3 }}"
|
||||
real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}"
|
||||
rerun_flaky: true
|
||||
pg_version: ${{ matrix.pg_version }}
|
||||
env:
|
||||
@@ -407,7 +409,9 @@ jobs:
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
|
||||
- uses: actions/github-script@v6
|
||||
if: ${{ !cancelled() }}
|
||||
if: >
|
||||
!cancelled() &&
|
||||
github.event_name == 'pull_request'
|
||||
with:
|
||||
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
|
||||
retries: 5
|
||||
@@ -417,7 +421,7 @@ jobs:
|
||||
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
|
||||
}
|
||||
|
||||
const script = require("./scripts/comment-test-report.js")
|
||||
const script = require("./scripts/pr-comment-test-report.js")
|
||||
await script({
|
||||
github,
|
||||
context,
|
||||
@@ -492,24 +496,19 @@ jobs:
|
||||
env:
|
||||
COMMIT_URL: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.event.pull_request.head.sha || github.sha }}
|
||||
run: |
|
||||
scripts/coverage --dir=/tmp/coverage \
|
||||
report \
|
||||
scripts/coverage \
|
||||
--dir=/tmp/coverage report \
|
||||
--input-objects=/tmp/coverage/binaries.list \
|
||||
--commit-url=${COMMIT_URL} \
|
||||
--format=github
|
||||
|
||||
scripts/coverage --dir=/tmp/coverage \
|
||||
report \
|
||||
--input-objects=/tmp/coverage/binaries.list \
|
||||
--format=lcov
|
||||
|
||||
- name: Upload coverage report
|
||||
id: upload-coverage-report
|
||||
env:
|
||||
BUCKET: neon-github-public-dev
|
||||
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
run: |
|
||||
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://${BUCKET}/code-coverage/${COMMIT_SHA}
|
||||
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://neon-github-public-dev/code-coverage/${COMMIT_SHA}
|
||||
|
||||
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/code-coverage/${COMMIT_SHA}/index.html
|
||||
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
|
||||
@@ -664,9 +663,6 @@ jobs:
|
||||
project: nrdv0s4kcs
|
||||
push: true
|
||||
tags: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:depot-${{needs.tag.outputs.build-tag}}
|
||||
build-args: |
|
||||
GIT_VERSION=${{ github.sha }}
|
||||
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
|
||||
compute-tools-image:
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
@@ -781,7 +777,7 @@ jobs:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.8.0
|
||||
VM_BUILDER_VERSION: v0.7.3-alpha3
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -802,7 +798,7 @@ jobs:
|
||||
|
||||
- name: Build vm image
|
||||
run: |
|
||||
./vm-builder -enable-file-cache -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
./vm-builder -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Pushing vm-compute-node image
|
||||
run: |
|
||||
|
||||
629
Cargo.lock
generated
629
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -3,7 +3,6 @@ members = [
|
||||
"compute_tools",
|
||||
"control_plane",
|
||||
"pageserver",
|
||||
"pageserver/ctl",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"storage_broker",
|
||||
@@ -23,7 +22,7 @@ async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
atty = "0.2.14"
|
||||
aws-config = { version = "0.55", default-features = false, features=["rustls"] }
|
||||
aws-sdk-s3 = "0.27"
|
||||
aws-sdk-s3 = "0.25"
|
||||
aws-smithy-http = "0.55"
|
||||
aws-credential-types = "0.55"
|
||||
aws-types = "0.55"
|
||||
|
||||
@@ -47,7 +47,8 @@ RUN set -e \
|
||||
&& mold -run cargo build \
|
||||
--bin pg_sni_router \
|
||||
--bin pageserver \
|
||||
--bin pagectl \
|
||||
--bin pageserver_binutils \
|
||||
--bin draw_timeline_dir \
|
||||
--bin safekeeper \
|
||||
--bin storage_broker \
|
||||
--bin proxy \
|
||||
@@ -72,7 +73,8 @@ RUN set -e \
|
||||
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pagectl /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
|
||||
|
||||
@@ -632,7 +632,6 @@ RUN apt update && \
|
||||
libxml2 \
|
||||
libxslt1.1 \
|
||||
libzstd1 \
|
||||
libcurl4-openssl-dev \
|
||||
procps && \
|
||||
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
|
||||
@@ -362,8 +362,6 @@ impl ComputeNode {
|
||||
};
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
|
||||
handle_roles(spec, &mut client)?;
|
||||
handle_databases(spec, &mut client)?;
|
||||
@@ -405,9 +403,7 @@ impl ComputeNode {
|
||||
self.pg_reload_conf(&mut client)?;
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
|
||||
@@ -121,8 +121,9 @@ impl RoleExt for Role {
|
||||
/// string of arguments.
|
||||
fn to_pg_options(&self) -> String {
|
||||
// XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane.
|
||||
let mut params: String = self.options.as_pg_options();
|
||||
params.push_str(" LOGIN");
|
||||
// For now, we do not use generic `options` for roles. Once used, add
|
||||
// `self.options.as_pg_options()` somewhere here.
|
||||
let mut params: String = "LOGIN".to_string();
|
||||
|
||||
if let Some(pass) = &self.encrypted_password {
|
||||
// Some time ago we supported only md5 and treated all encrypted_password as md5.
|
||||
|
||||
@@ -62,7 +62,7 @@ fn do_control_plane_request(
|
||||
}
|
||||
}
|
||||
|
||||
/// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
|
||||
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
|
||||
/// env variable is set, it will be used for authorization.
|
||||
pub fn get_spec_from_control_plane(
|
||||
base_uri: &str,
|
||||
|
||||
@@ -16,7 +16,7 @@ mod pg_helpers_tests {
|
||||
);
|
||||
assert_eq!(
|
||||
spec.cluster.roles.first().unwrap().to_pg_options(),
|
||||
" LOGIN PASSWORD 'md56b1d16b78004bbd51fa06af9eda75972'"
|
||||
"LOGIN PASSWORD 'md56b1d16b78004bbd51fa06af9eda75972'"
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
|
||||
const DEFAULT_BRANCH_NAME: &str = "main";
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
const DEFAULT_PG_VERSION: &str = "15";
|
||||
const DEFAULT_PG_VERSION: &str = "14";
|
||||
|
||||
fn default_conf() -> String {
|
||||
format!(
|
||||
|
||||
@@ -24,7 +24,7 @@ use utils::{
|
||||
|
||||
use crate::safekeeper::SafekeeperNode;
|
||||
|
||||
pub const DEFAULT_PG_VERSION: u32 = 15;
|
||||
pub const DEFAULT_PG_VERSION: u32 = 14;
|
||||
|
||||
//
|
||||
// This data structures represents neon_local CLI config
|
||||
|
||||
@@ -370,10 +370,6 @@ impl PageServerNode {
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
};
|
||||
|
||||
// If tenant ID was not specified, generate one
|
||||
let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
|
||||
|
||||
let request = models::TenantCreateRequest {
|
||||
new_tenant_id,
|
||||
config,
|
||||
@@ -499,9 +495,6 @@ impl PageServerNode {
|
||||
ancestor_timeline_id: Option<TimelineId>,
|
||||
pg_version: Option<u32>,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
// If timeline ID was not specified, generate one
|
||||
let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate());
|
||||
|
||||
self.http_request(
|
||||
Method::POST,
|
||||
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
|
||||
|
||||
@@ -1,14 +1,6 @@
|
||||
#!/bin/bash
|
||||
set -eux
|
||||
|
||||
# Generate a random tenant or timeline ID
|
||||
#
|
||||
# Takes a variable name as argument. The result is stored in that variable.
|
||||
generate_id() {
|
||||
local -n resvar=$1
|
||||
printf -v resvar '%08x%08x%08x%08x' $SRANDOM $SRANDOM $SRANDOM $SRANDOM
|
||||
}
|
||||
|
||||
PG_VERSION=${PG_VERSION:-14}
|
||||
|
||||
SPEC_FILE_ORG=/var/db/postgres/specs/spec.json
|
||||
@@ -21,29 +13,29 @@ done
|
||||
echo "Page server is ready."
|
||||
|
||||
echo "Create a tenant and timeline"
|
||||
generate_id tenant_id
|
||||
PARAMS=(
|
||||
-sb
|
||||
-X POST
|
||||
-H "Content-Type: application/json"
|
||||
-d "{\"new_tenant_id\": \"${tenant_id}\"}"
|
||||
-d "{}"
|
||||
http://pageserver:9898/v1/tenant/
|
||||
)
|
||||
result=$(curl "${PARAMS[@]}")
|
||||
echo $result | jq .
|
||||
tenant_id=$(curl "${PARAMS[@]}" | sed 's/"//g')
|
||||
|
||||
generate_id timeline_id
|
||||
PARAMS=(
|
||||
-sb
|
||||
-X POST
|
||||
-H "Content-Type: application/json"
|
||||
-d "{\"new_timeline_id\": \"${timeline_id}\", \"pg_version\": ${PG_VERSION}}"
|
||||
-d "{\"tenant_id\":\"${tenant_id}\", \"pg_version\": ${PG_VERSION}}"
|
||||
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline/"
|
||||
)
|
||||
result=$(curl "${PARAMS[@]}")
|
||||
echo $result | jq .
|
||||
|
||||
echo "Overwrite tenant id and timeline id in spec file"
|
||||
tenant_id=$(echo ${result} | jq -r .tenant_id)
|
||||
timeline_id=$(echo ${result} | jq -r .timeline_id)
|
||||
|
||||
sed "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE_ORG} > ${SPEC_FILE}
|
||||
sed -i "s/TIMELINE_ID/${timeline_id}/" ${SPEC_FILE}
|
||||
|
||||
|
||||
@@ -18,29 +18,7 @@ use crate::reltag::RelTag;
|
||||
use anyhow::bail;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
|
||||
/// The state of a tenant in this pageserver.
|
||||
///
|
||||
/// ```mermaid
|
||||
/// stateDiagram-v2
|
||||
///
|
||||
/// [*] --> Loading: spawn_load()
|
||||
/// [*] --> Attaching: spawn_attach()
|
||||
///
|
||||
/// Loading --> Activating: activate()
|
||||
/// Attaching --> Activating: activate()
|
||||
/// Activating --> Active: infallible
|
||||
///
|
||||
/// Loading --> Broken: load() failure
|
||||
/// Attaching --> Broken: attach() failure
|
||||
///
|
||||
/// Active --> Stopping: set_stopping(), part of shutdown & detach
|
||||
/// Stopping --> Broken: late error in remove_tenant_from_memory
|
||||
///
|
||||
/// Broken --> [*]: ignore / detach / shutdown
|
||||
/// Stopping --> [*]: remove_from_memory complete
|
||||
///
|
||||
/// Active --> Broken: cfg(testing)-only tenant break point
|
||||
/// ```
|
||||
/// A state of a tenant in pageserver's memory.
|
||||
#[derive(
|
||||
Clone,
|
||||
PartialEq,
|
||||
@@ -48,63 +26,43 @@ use bytes::{BufMut, Bytes, BytesMut};
|
||||
serde::Serialize,
|
||||
serde::Deserialize,
|
||||
strum_macros::Display,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::EnumVariantNames,
|
||||
strum_macros::AsRefStr,
|
||||
strum_macros::IntoStaticStr,
|
||||
)]
|
||||
#[serde(tag = "slug", content = "data")]
|
||||
pub enum TenantState {
|
||||
/// This tenant is being loaded from local disk.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
/// This tenant is being loaded from local disk
|
||||
Loading,
|
||||
/// This tenant is being attached to the pageserver.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
/// This tenant is being downloaded from cloud storage.
|
||||
Attaching,
|
||||
/// The tenant is transitioning from Loading/Attaching to Active.
|
||||
///
|
||||
/// While in this state, the individual timelines are being activated.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
Activating(ActivatingFrom),
|
||||
/// The tenant has finished activating and is open for business.
|
||||
///
|
||||
/// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
|
||||
Activating,
|
||||
/// Tenant is fully operational
|
||||
Active,
|
||||
/// The tenant is recognized by pageserver, but it is being detached or the
|
||||
/// A tenant is recognized by pageserver, but it is being detached or the
|
||||
/// system is being shut down.
|
||||
///
|
||||
/// Transitions out of this state are possible through `set_broken()`.
|
||||
Stopping,
|
||||
/// The tenant is recognized by the pageserver, but can no longer be used for
|
||||
/// any operations.
|
||||
///
|
||||
/// If the tenant fails to load or attach, it will transition to this state
|
||||
/// and it is guaranteed that no background tasks are running in its name.
|
||||
///
|
||||
/// The other way to transition into this state is from `Stopping` state
|
||||
/// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
|
||||
/// if the cleanup future executed by `remove_tenant_from_memory()` fails.
|
||||
/// A tenant is recognized by the pageserver, but can no longer be used for
|
||||
/// any operations, because it failed to be activated.
|
||||
Broken { reason: String, backtrace: String },
|
||||
}
|
||||
|
||||
impl TenantState {
|
||||
pub fn attachment_status(&self) -> TenantAttachmentStatus {
|
||||
use TenantAttachmentStatus::*;
|
||||
|
||||
// Below TenantState::Activating is used as "transient" or "transparent" state for
|
||||
// attachment_status determining.
|
||||
match self {
|
||||
// The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
|
||||
// So, technically, we can return Attached here.
|
||||
// However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
|
||||
// But, our attach task might still be fetching the remote timelines, etc.
|
||||
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
|
||||
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
|
||||
Self::Attaching => Maybe,
|
||||
// tenant mgr startup distinguishes attaching from loading via marker file.
|
||||
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
|
||||
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
|
||||
Self::Loading => Attached,
|
||||
Self::Activating => todo!(),
|
||||
// We only reach Active after successful load / attach.
|
||||
// So, call atttachment status Attached.
|
||||
Self::Active => Attached,
|
||||
@@ -143,15 +101,6 @@ impl std::fmt::Debug for TenantState {
|
||||
}
|
||||
}
|
||||
|
||||
/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum ActivatingFrom {
|
||||
/// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
|
||||
Loading,
|
||||
/// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
|
||||
Attaching,
|
||||
}
|
||||
|
||||
/// A state of a timeline in pageserver's memory.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum TimelineState {
|
||||
@@ -172,8 +121,9 @@ pub enum TimelineState {
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TimelineCreateRequest {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub new_timeline_id: TimelineId,
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub new_timeline_id: Option<TimelineId>,
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub ancestor_timeline_id: Option<TimelineId>,
|
||||
@@ -184,11 +134,12 @@ pub struct TimelineCreateRequest {
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantCreateRequest {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub new_tenant_id: TenantId,
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub new_tenant_id: Option<TenantId>,
|
||||
#[serde(flatten)]
|
||||
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
@@ -236,10 +187,10 @@ pub struct StatusResponse {
|
||||
}
|
||||
|
||||
impl TenantCreateRequest {
|
||||
pub fn new(new_tenant_id: TenantId) -> TenantCreateRequest {
|
||||
pub fn new(new_tenant_id: Option<TenantId>) -> TenantCreateRequest {
|
||||
TenantCreateRequest {
|
||||
new_tenant_id,
|
||||
config: TenantConfig::default(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -883,55 +834,4 @@ mod tests {
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tenantstatus_activating_serde() {
|
||||
let states = [
|
||||
TenantState::Activating(ActivatingFrom::Loading),
|
||||
TenantState::Activating(ActivatingFrom::Attaching),
|
||||
];
|
||||
let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
|
||||
|
||||
let actual = serde_json::to_string(&states).unwrap();
|
||||
|
||||
assert_eq!(actual, expected);
|
||||
|
||||
let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
|
||||
|
||||
assert_eq!(states.as_slice(), &parsed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tenantstatus_activating_strum() {
|
||||
// tests added, because we use these for metrics
|
||||
let examples = [
|
||||
(line!(), TenantState::Loading, "Loading"),
|
||||
(line!(), TenantState::Attaching, "Attaching"),
|
||||
(
|
||||
line!(),
|
||||
TenantState::Activating(ActivatingFrom::Loading),
|
||||
"Activating",
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
TenantState::Activating(ActivatingFrom::Attaching),
|
||||
"Activating",
|
||||
),
|
||||
(line!(), TenantState::Active, "Active"),
|
||||
(line!(), TenantState::Stopping, "Stopping"),
|
||||
(
|
||||
line!(),
|
||||
TenantState::Broken {
|
||||
reason: "Example".into(),
|
||||
backtrace: "Looooong backtrace".into(),
|
||||
},
|
||||
"Broken",
|
||||
),
|
||||
];
|
||||
|
||||
for (line, rendered, expected) in examples {
|
||||
let actual: &'static str = rendered.into();
|
||||
assert_eq!(actual, expected, "example on {line}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,33 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
|
||||
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
|
||||
///
|
||||
/// Can be cloned, moved and kept around in futures as "guard objects".
|
||||
#[derive(Clone)]
|
||||
pub struct Completion(mpsc::Sender<()>);
|
||||
|
||||
/// Barrier will wait until all clones of [`Completion`] have been dropped.
|
||||
#[derive(Clone)]
|
||||
pub struct Barrier(Arc<Mutex<mpsc::Receiver<()>>>);
|
||||
|
||||
impl Barrier {
|
||||
pub async fn wait(self) {
|
||||
self.0.lock().await.recv().await;
|
||||
}
|
||||
|
||||
pub async fn maybe_wait(barrier: Option<Barrier>) {
|
||||
if let Some(b) = barrier {
|
||||
b.wait().await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create new Guard and Barrier pair.
|
||||
pub fn channel() -> (Completion, Barrier) {
|
||||
let (tx, rx) = mpsc::channel::<()>(1);
|
||||
let rx = Mutex::new(rx);
|
||||
let rx = Arc::new(rx);
|
||||
(Completion(tx), Barrier(rx))
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::auth::{Claims, JwtAuth};
|
||||
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
|
||||
use crate::http::error;
|
||||
use anyhow::{anyhow, Context};
|
||||
use hyper::header::{HeaderName, AUTHORIZATION};
|
||||
use hyper::http::HeaderValue;
|
||||
@@ -16,6 +16,8 @@ use std::future::Future;
|
||||
use std::net::TcpListener;
|
||||
use std::str::FromStr;
|
||||
|
||||
use super::error::ApiError;
|
||||
|
||||
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"libmetrics_metric_handler_requests_total",
|
||||
@@ -33,12 +35,8 @@ struct RequestId(String);
|
||||
/// Adds a tracing info_span! instrumentation around the handler events,
|
||||
/// logs the request start and end events for non-GET requests and non-200 responses.
|
||||
///
|
||||
/// Usage: Replace `my_handler` with `|r| request_span(r, my_handler)`
|
||||
///
|
||||
/// Use this to distinguish between logs of different HTTP requests: every request handler wrapped
|
||||
/// with this will get request info logged in the wrapping span, including the unique request ID.
|
||||
///
|
||||
/// This also handles errors, logging them and converting them to an HTTP error response.
|
||||
/// in this type will get request info logged in the wrapping span, including the unique request ID.
|
||||
///
|
||||
/// There could be other ways to implement similar functionality:
|
||||
///
|
||||
@@ -56,56 +54,60 @@ struct RequestId(String);
|
||||
/// tries to achive with its `.instrument` used in the current approach.
|
||||
///
|
||||
/// If needed, a declarative macro to substitute the |r| ... closure boilerplate could be introduced.
|
||||
pub async fn request_span<R, H>(request: Request<Body>, handler: H) -> R::Output
|
||||
pub struct RequestSpan<E, R, H>(pub H)
|
||||
where
|
||||
R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
|
||||
H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
|
||||
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
|
||||
R: Future<Output = Result<Response<Body>, E>> + Send + 'static,
|
||||
H: Fn(Request<Body>) -> R + Send + Sync + 'static;
|
||||
|
||||
impl<E, R, H> RequestSpan<E, R, H>
|
||||
where
|
||||
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
|
||||
R: Future<Output = Result<Response<Body>, E>> + Send + 'static,
|
||||
H: Fn(Request<Body>) -> R + Send + Sync + 'static,
|
||||
{
|
||||
let request_id = request.context::<RequestId>().unwrap_or_default().0;
|
||||
let method = request.method();
|
||||
let path = request.uri().path();
|
||||
let request_span = info_span!("request", %method, %path, %request_id);
|
||||
/// Creates a tracing span around inner request handler and executes the request handler in the contex of that span.
|
||||
/// Use as `|r| RequestSpan(my_handler).handle(r)` instead of `my_handler` as the request handler to get the span enabled.
|
||||
pub async fn handle(self, request: Request<Body>) -> Result<Response<Body>, E> {
|
||||
let request_id = request.context::<RequestId>().unwrap_or_default().0;
|
||||
let method = request.method();
|
||||
let path = request.uri().path();
|
||||
let request_span = info_span!("request", %method, %path, %request_id);
|
||||
|
||||
let log_quietly = method == Method::GET;
|
||||
async move {
|
||||
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
|
||||
if log_quietly {
|
||||
debug!("Handling request");
|
||||
} else {
|
||||
info!("Handling request");
|
||||
}
|
||||
|
||||
// No special handling for panics here. There's a `tracing_panic_hook` from another
|
||||
// module to do that globally.
|
||||
let res = handler(request).await;
|
||||
|
||||
cancellation_guard.disarm();
|
||||
|
||||
// Log the result if needed.
|
||||
//
|
||||
// We also convert any errors into an Ok response with HTTP error code here.
|
||||
// `make_router` sets a last-resort error handler that would do the same, but
|
||||
// we prefer to do it here, before we exit the request span, so that the error
|
||||
// is still logged with the span.
|
||||
//
|
||||
// (Because we convert errors to Ok response, we never actually return an error,
|
||||
// and we could declare the function to return the never type (`!`). However,
|
||||
// using `routerify::RouterBuilder` requires a proper error type.)
|
||||
match res {
|
||||
Ok(response) => {
|
||||
let response_status = response.status();
|
||||
if log_quietly && response_status.is_success() {
|
||||
debug!("Request handled, status: {response_status}");
|
||||
} else {
|
||||
info!("Request handled, status: {response_status}");
|
||||
}
|
||||
Ok(response)
|
||||
let log_quietly = method == Method::GET;
|
||||
async move {
|
||||
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
|
||||
if log_quietly {
|
||||
debug!("Handling request");
|
||||
} else {
|
||||
info!("Handling request");
|
||||
}
|
||||
|
||||
// Note that we reuse `error::handler` here and not returning and error at all,
|
||||
// yet cannot use `!` directly in the method signature due to `routerify::RouterBuilder` limitation.
|
||||
// Usage of the error handler also means that we expect only the `ApiError` errors to be raised in this call.
|
||||
//
|
||||
// Panics are not handled separately, there's a `tracing_panic_hook` from another module to do that globally.
|
||||
let res = (self.0)(request).await;
|
||||
|
||||
cancellation_guard.disarm();
|
||||
|
||||
match res {
|
||||
Ok(response) => {
|
||||
let response_status = response.status();
|
||||
if log_quietly && response_status.is_success() {
|
||||
debug!("Request handled, status: {response_status}");
|
||||
} else {
|
||||
info!("Request handled, status: {response_status}");
|
||||
}
|
||||
Ok(response)
|
||||
}
|
||||
Err(e) => Ok(error::handler(e.into()).await),
|
||||
}
|
||||
Err(err) => Ok(api_error_handler(err)),
|
||||
}
|
||||
.instrument(request_span)
|
||||
.await
|
||||
}
|
||||
.instrument(request_span)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Drop guard to WARN in case the request was dropped before completion.
|
||||
@@ -205,8 +207,10 @@ pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
|
||||
.middleware(Middleware::post_with_info(
|
||||
add_request_id_header_to_response,
|
||||
))
|
||||
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
||||
.err_handler(route_error_handler)
|
||||
.get("/metrics", |r| {
|
||||
RequestSpan(prometheus_metrics_handler).handle(r)
|
||||
})
|
||||
.err_handler(error::handler)
|
||||
}
|
||||
|
||||
pub fn attach_openapi_ui(
|
||||
@@ -216,14 +220,12 @@ pub fn attach_openapi_ui(
|
||||
ui_mount_path: &'static str,
|
||||
) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
router_builder
|
||||
.get(spec_mount_path,
|
||||
move |r| request_span(r, move |_| async move {
|
||||
Ok(Response::builder().body(Body::from(spec)).unwrap())
|
||||
})
|
||||
)
|
||||
.get(ui_mount_path,
|
||||
move |r| request_span(r, move |_| async move {
|
||||
Ok(Response::builder().body(Body::from(format!(r#"
|
||||
.get(spec_mount_path, move |r| {
|
||||
RequestSpan(move |_| async move { Ok(Response::builder().body(Body::from(spec)).unwrap()) })
|
||||
.handle(r)
|
||||
})
|
||||
.get(ui_mount_path, move |r| RequestSpan( move |_| async move {
|
||||
Ok(Response::builder().body(Body::from(format!(r#"
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
@@ -253,8 +255,7 @@ pub fn attach_openapi_ui(
|
||||
</body>
|
||||
</html>
|
||||
"#, spec_mount_path))).unwrap())
|
||||
})
|
||||
)
|
||||
}).handle(r))
|
||||
}
|
||||
|
||||
fn parse_token(header_value: &str) -> Result<&str, ApiError> {
|
||||
|
||||
@@ -83,24 +83,13 @@ impl HttpErrorBody {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
|
||||
match err.downcast::<ApiError>() {
|
||||
Ok(api_error) => api_error_handler(*api_error),
|
||||
Err(other_error) => {
|
||||
// We expect all the request handlers to return an ApiError, so this should
|
||||
// not be reached. But just in case.
|
||||
error!("Error processing HTTP request: {other_error:?}");
|
||||
HttpErrorBody::response_from_msg_and_status(
|
||||
other_error.to_string(),
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
pub async fn handler(err: routerify::RouteError) -> Response<Body> {
|
||||
let api_error = err
|
||||
.downcast::<ApiError>()
|
||||
.expect("handler should always return api error");
|
||||
|
||||
pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
|
||||
// Print a stack trace for Internal Server errors
|
||||
if let ApiError::InternalServerError(_) = api_error {
|
||||
if let ApiError::InternalServerError(_) = api_error.as_ref() {
|
||||
error!("Error processing HTTP request: {api_error:?}");
|
||||
} else {
|
||||
error!("Error processing HTTP request: {api_error:#}");
|
||||
|
||||
@@ -60,9 +60,6 @@ pub mod tracing_span_assert;
|
||||
|
||||
pub mod rate_limit;
|
||||
|
||||
/// Simple once-barrier and a guard which keeps barrier awaiting.
|
||||
pub mod completion;
|
||||
|
||||
mod failpoint_macro_helpers {
|
||||
|
||||
/// use with fail::cfg("$name", "return(2000)")
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
[package]
|
||||
name = "pagectl"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
clap = { workspace = true, features = ["string"] }
|
||||
git-version.workspace = true
|
||||
pageserver = { path = ".." }
|
||||
postgres_ffi.workspace = true
|
||||
utils.workspace = true
|
||||
svg_fmt.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
@@ -1,169 +0,0 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::Result;
|
||||
use clap::Subcommand;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
||||
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver::{
|
||||
repository::{Key, KEY_SIZE},
|
||||
tenant::{
|
||||
block_io::FileBlockReader, disk_btree::VisitDirection,
|
||||
storage_layer::delta_layer::DELTA_KEY_SIZE,
|
||||
},
|
||||
virtual_file::VirtualFile,
|
||||
};
|
||||
use std::fs;
|
||||
use utils::bin_ser::BeSer;
|
||||
|
||||
use crate::layer_map_analyzer::parse_filename;
|
||||
|
||||
#[derive(Subcommand)]
|
||||
pub(crate) enum LayerCmd {
|
||||
/// List all tenants and timelines under the pageserver path
|
||||
///
|
||||
/// Example: `cargo run --bin pagectl layer list .neon/`
|
||||
List { path: PathBuf },
|
||||
/// List all layers of a given tenant and timeline
|
||||
///
|
||||
/// Example: `cargo run --bin pagectl layer list .neon/`
|
||||
ListLayer {
|
||||
path: PathBuf,
|
||||
tenant: String,
|
||||
timeline: String,
|
||||
},
|
||||
/// Dump all information of a layer file
|
||||
DumpLayer {
|
||||
path: PathBuf,
|
||||
tenant: String,
|
||||
timeline: String,
|
||||
/// The id from list-layer command
|
||||
id: usize,
|
||||
},
|
||||
}
|
||||
|
||||
fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
use pageserver::tenant::blob_io::BlobCursor;
|
||||
use pageserver::tenant::block_io::BlockReader;
|
||||
|
||||
let path = path.as_ref();
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let file = FileBlockReader::new(VirtualFile::open(path)?);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
actual_summary.index_start_blk,
|
||||
actual_summary.index_root_blk,
|
||||
&file,
|
||||
);
|
||||
// TODO(chi): dedup w/ `delta_layer.rs` by exposing the API.
|
||||
let mut all = vec![];
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|key, value_offset| {
|
||||
let curr = Key::from_slice(&key[..KEY_SIZE]);
|
||||
all.push((curr, BlobRef(value_offset)));
|
||||
true
|
||||
},
|
||||
)?;
|
||||
let mut cursor = BlockCursor::new(&file);
|
||||
for (k, v) in all {
|
||||
let value = cursor.read_blob(v.pos())?;
|
||||
println!("key:{} value_len:{}", k, value.len());
|
||||
}
|
||||
// TODO(chi): special handling for last key?
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
match cmd {
|
||||
LayerCmd::List { path } => {
|
||||
for tenant in fs::read_dir(path.join("tenants"))? {
|
||||
let tenant = tenant?;
|
||||
if !tenant.file_type()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
println!("tenant {}", tenant.file_name().to_string_lossy());
|
||||
for timeline in fs::read_dir(tenant.path().join("timelines"))? {
|
||||
let timeline = timeline?;
|
||||
if !timeline.file_type()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
println!("- timeline {}", timeline.file_name().to_string_lossy());
|
||||
}
|
||||
}
|
||||
}
|
||||
LayerCmd::ListLayer {
|
||||
path,
|
||||
tenant,
|
||||
timeline,
|
||||
} => {
|
||||
let timeline_path = path
|
||||
.join("tenants")
|
||||
.join(tenant)
|
||||
.join("timelines")
|
||||
.join(timeline);
|
||||
let mut idx = 0;
|
||||
for layer in fs::read_dir(timeline_path)? {
|
||||
let layer = layer?;
|
||||
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
|
||||
{
|
||||
println!(
|
||||
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
|
||||
idx,
|
||||
layer_file.key_range.start,
|
||||
layer_file.key_range.end,
|
||||
layer_file.lsn_range.start,
|
||||
layer_file.lsn_range.end,
|
||||
layer_file.is_delta,
|
||||
);
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
LayerCmd::DumpLayer {
|
||||
path,
|
||||
tenant,
|
||||
timeline,
|
||||
id,
|
||||
} => {
|
||||
let timeline_path = path
|
||||
.join("tenants")
|
||||
.join(tenant)
|
||||
.join("timelines")
|
||||
.join(timeline);
|
||||
let mut idx = 0;
|
||||
for layer in fs::read_dir(timeline_path)? {
|
||||
let layer = layer?;
|
||||
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
|
||||
{
|
||||
if *id == idx {
|
||||
// TODO(chi): dedup code
|
||||
println!(
|
||||
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
|
||||
idx,
|
||||
layer_file.key_range.start,
|
||||
layer_file.key_range.end,
|
||||
layer_file.lsn_range.start,
|
||||
layer_file.lsn_range.end,
|
||||
layer_file.is_delta,
|
||||
);
|
||||
|
||||
if layer_file.is_delta {
|
||||
read_delta_file(layer.path())?;
|
||||
} else {
|
||||
anyhow::bail!("not supported yet :(");
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,179 +0,0 @@
|
||||
//! A helper tool to manage pageserver binary files.
|
||||
//! Accepts a file as an argument, attempts to parse it with all ways possible
|
||||
//! and prints its interpreted context.
|
||||
//!
|
||||
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
|
||||
|
||||
mod draw_timeline_dir;
|
||||
mod layer_map_analyzer;
|
||||
mod layers;
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use layers::LayerCmd;
|
||||
use pageserver::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
page_cache,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
};
|
||||
use postgres_ffi::ControlFileData;
|
||||
use std::path::{Path, PathBuf};
|
||||
use utils::{lsn::Lsn, project_git_version};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(
|
||||
version = GIT_VERSION,
|
||||
about = "Neon Pageserver binutils",
|
||||
long_about = "Reads pageserver (and related) binary files management utility"
|
||||
)]
|
||||
#[command(propagate_version = true)]
|
||||
struct CliOpts {
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum Commands {
|
||||
Metadata(MetadataCmd),
|
||||
PrintLayerFile(PrintLayerFileCmd),
|
||||
DrawTimeline {},
|
||||
AnalyzeLayerMap(AnalyzeLayerMapCmd),
|
||||
#[command(subcommand)]
|
||||
Layer(LayerCmd),
|
||||
}
|
||||
|
||||
/// Read and update pageserver metadata file
|
||||
#[derive(Parser)]
|
||||
struct MetadataCmd {
|
||||
/// Input metadata file path
|
||||
metadata_path: PathBuf,
|
||||
/// Replace disk consistent Lsn
|
||||
disk_consistent_lsn: Option<Lsn>,
|
||||
/// Replace previous record Lsn
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
/// Replace latest gc cuttoff
|
||||
latest_gc_cuttoff: Option<Lsn>,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
struct PrintLayerFileCmd {
|
||||
/// Pageserver data path
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
struct AnalyzeLayerMapCmd {
|
||||
/// Pageserver data path
|
||||
path: PathBuf,
|
||||
/// Max holes
|
||||
max_holes: Option<usize>,
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let cli = CliOpts::parse();
|
||||
|
||||
match cli.command {
|
||||
Commands::Layer(cmd) => {
|
||||
layers::main(&cmd)?;
|
||||
}
|
||||
Commands::Metadata(cmd) => {
|
||||
handle_metadata(&cmd)?;
|
||||
}
|
||||
Commands::DrawTimeline {} => {
|
||||
draw_timeline_dir::main()?;
|
||||
}
|
||||
Commands::AnalyzeLayerMap(cmd) => {
|
||||
layer_map_analyzer::main(&cmd)?;
|
||||
}
|
||||
Commands::PrintLayerFile(cmd) => {
|
||||
if let Err(e) = read_pg_control_file(&cmd.path) {
|
||||
println!(
|
||||
"Failed to read input file as a pg control one: {e:#}\n\
|
||||
Attempting to read it as layer file"
|
||||
);
|
||||
print_layerfile(&cmd.path)?;
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
|
||||
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
|
||||
println!("{control_file:?}");
|
||||
let control_file_initdb = Lsn(control_file.checkPoint);
|
||||
println!(
|
||||
"pg_initdb_lsn: {}, aligned: {}",
|
||||
control_file_initdb,
|
||||
control_file_initdb.align()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx)
|
||||
}
|
||||
|
||||
fn handle_metadata(
|
||||
MetadataCmd {
|
||||
metadata_path: path,
|
||||
disk_consistent_lsn,
|
||||
prev_record_lsn,
|
||||
latest_gc_cuttoff,
|
||||
}: &MetadataCmd,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let metadata_bytes = std::fs::read(path)?;
|
||||
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
|
||||
println!("Current metadata:\n{meta:?}");
|
||||
let mut update_meta = false;
|
||||
if let Some(disk_consistent_lsn) = disk_consistent_lsn {
|
||||
meta = TimelineMetadata::new(
|
||||
*disk_consistent_lsn,
|
||||
meta.prev_record_lsn(),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
if let Some(prev_record_lsn) = prev_record_lsn {
|
||||
meta = TimelineMetadata::new(
|
||||
meta.disk_consistent_lsn(),
|
||||
Some(*prev_record_lsn),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
if let Some(latest_gc_cuttoff) = latest_gc_cuttoff {
|
||||
meta = TimelineMetadata::new(
|
||||
meta.disk_consistent_lsn(),
|
||||
meta.prev_record_lsn(),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
*latest_gc_cuttoff,
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
|
||||
if update_meta {
|
||||
let metadata_bytes = meta.to_bytes()?;
|
||||
std::fs::write(path, metadata_bytes)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -12,7 +12,7 @@
|
||||
//! Example use:
|
||||
//! ```
|
||||
//! $ ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
|
||||
//! $ grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
|
||||
//! $ grep "__" | cargo run --release --bin draw_timeline_dir > out.svg
|
||||
//! $ firefox out.svg
|
||||
//! ```
|
||||
//!
|
||||
@@ -62,7 +62,7 @@ fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
|
||||
(keys, lsns)
|
||||
}
|
||||
|
||||
pub fn main() -> Result<()> {
|
||||
fn main() -> Result<()> {
|
||||
// Parse layer filenames from stdin
|
||||
let mut ranges: Vec<(Range<Key>, Range<Lsn>)> = vec![];
|
||||
let stdin = io::stdin();
|
||||
@@ -6,7 +6,7 @@ use anyhow::Result;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::ops::Range;
|
||||
use std::{fs, path::Path, str};
|
||||
use std::{env, fs, path::Path, path::PathBuf, str, str::FromStr};
|
||||
|
||||
use pageserver::page_cache::PAGE_SZ;
|
||||
use pageserver::repository::{Key, KEY_SIZE};
|
||||
@@ -18,14 +18,12 @@ use pageserver::virtual_file::VirtualFile;
|
||||
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
use crate::AnalyzeLayerMapCmd;
|
||||
|
||||
const MIN_HOLE_LENGTH: i128 = (128 * 1024 * 1024 / PAGE_SZ) as i128;
|
||||
const DEFAULT_MAX_HOLES: usize = 10;
|
||||
|
||||
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
|
||||
#[derive(PartialEq, Eq)]
|
||||
pub struct Hole(Range<Key>);
|
||||
struct Hole(Range<Key>);
|
||||
|
||||
impl Ord for Hole {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
@@ -41,11 +39,11 @@ impl PartialOrd for Hole {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct LayerFile {
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn_range: Range<Lsn>,
|
||||
pub is_delta: bool,
|
||||
pub holes: Vec<Hole>,
|
||||
struct LayerFile {
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
is_delta: bool,
|
||||
holes: Vec<Hole>,
|
||||
}
|
||||
|
||||
impl LayerFile {
|
||||
@@ -69,7 +67,7 @@ impl LayerFile {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
|
||||
fn parse_filename(name: &str) -> Option<LayerFile> {
|
||||
let split: Vec<&str> = name.split("__").collect();
|
||||
if split.len() != 2 {
|
||||
return None;
|
||||
@@ -129,9 +127,18 @@ fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
Ok(holes)
|
||||
}
|
||||
|
||||
pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let storage_path = &cmd.path;
|
||||
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
|
||||
fn main() -> Result<()> {
|
||||
let args: Vec<String> = env::args().collect();
|
||||
if args.len() < 2 {
|
||||
println!("Usage: layer_map_analyzer PAGESERVER_DATA_DIR [MAX_HOLES]");
|
||||
return Ok(());
|
||||
}
|
||||
let storage_path = PathBuf::from_str(&args[1])?;
|
||||
let max_holes = if args.len() > 2 {
|
||||
args[2].parse::<usize>().unwrap()
|
||||
} else {
|
||||
DEFAULT_MAX_HOLES
|
||||
};
|
||||
|
||||
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
||||
pageserver::virtual_file::init(10);
|
||||
@@ -275,7 +275,6 @@ fn start_pageserver(
|
||||
let pageserver_listener = tcp_listener::bind(pg_addr)?;
|
||||
|
||||
// Launch broker client
|
||||
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
|
||||
let broker_client = WALRECEIVER_RUNTIME
|
||||
.block_on(async {
|
||||
// Note: we do not attempt connecting here (but validate endpoints sanity).
|
||||
@@ -335,34 +334,13 @@ fn start_pageserver(
|
||||
// Set up remote storage client
|
||||
let remote_storage = create_remote_storage_client(conf)?;
|
||||
|
||||
// All tenant load operations carry this while they are ongoing; it will be dropped once those
|
||||
// operations finish either successfully or in some other manner. However, the initial load
|
||||
// will be then done, and we can start the global background tasks.
|
||||
let (init_done_tx, init_done_rx) = utils::completion::channel();
|
||||
|
||||
// Scan the local 'tenants/' directory and start loading the tenants
|
||||
let init_started_at = std::time::Instant::now();
|
||||
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
|
||||
conf,
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
(init_done_tx, init_done_rx.clone()),
|
||||
))?;
|
||||
|
||||
BACKGROUND_RUNTIME.spawn({
|
||||
let init_done_rx = init_done_rx.clone();
|
||||
async move {
|
||||
init_done_rx.wait().await;
|
||||
|
||||
let elapsed = init_started_at.elapsed();
|
||||
|
||||
tracing::info!(
|
||||
elapsed_millis = elapsed.as_millis(),
|
||||
"Initial load completed."
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
// shared state between the disk-usage backed eviction background task and the http endpoint
|
||||
// that allows triggering disk-usage based eviction manually. note that the http endpoint
|
||||
// is still accessible even if background task is not configured as long as remote storage has
|
||||
@@ -374,7 +352,6 @@ fn start_pageserver(
|
||||
conf,
|
||||
remote_storage.clone(),
|
||||
disk_usage_eviction_state.clone(),
|
||||
init_done_rx.clone(),
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -412,7 +389,6 @@ fn start_pageserver(
|
||||
);
|
||||
|
||||
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
|
||||
let init_done_rx = init_done_rx;
|
||||
let metrics_ctx = RequestContext::todo_child(
|
||||
TaskKind::MetricsCollection,
|
||||
// This task itself shouldn't download anything.
|
||||
@@ -428,13 +404,6 @@ fn start_pageserver(
|
||||
"consumption metrics collection",
|
||||
true,
|
||||
async move {
|
||||
// first wait for initial load to complete before first iteration.
|
||||
//
|
||||
// this is because we only process active tenants and timelines, and the
|
||||
// Timeline::get_current_logical_size will spawn the logical size calculation,
|
||||
// which will not be rate-limited.
|
||||
init_done_rx.wait().await;
|
||||
|
||||
pageserver::consumption_metrics::collect_metrics(
|
||||
metric_collection_endpoint,
|
||||
conf.metric_collection_interval,
|
||||
|
||||
174
pageserver/src/bin/pageserver_binutils.rs
Normal file
174
pageserver/src/bin/pageserver_binutils.rs
Normal file
@@ -0,0 +1,174 @@
|
||||
//! A helper tool to manage pageserver binary files.
|
||||
//! Accepts a file as an argument, attempts to parse it with all ways possible
|
||||
//! and prints its interpreted context.
|
||||
//!
|
||||
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
str::FromStr,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use clap::{value_parser, Arg, Command};
|
||||
|
||||
use pageserver::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
page_cache,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
};
|
||||
use postgres_ffi::ControlFileData;
|
||||
use utils::{lsn::Lsn, project_git_version};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
const METADATA_SUBCOMMAND: &str = "metadata";
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let arg_matches = cli().get_matches();
|
||||
|
||||
match arg_matches.subcommand() {
|
||||
Some((subcommand_name, subcommand_matches)) => {
|
||||
let path = subcommand_matches
|
||||
.get_one::<PathBuf>("metadata_path")
|
||||
.context("'metadata_path' argument is missing")?
|
||||
.to_path_buf();
|
||||
anyhow::ensure!(
|
||||
subcommand_name == METADATA_SUBCOMMAND,
|
||||
"Unknown subcommand {subcommand_name}"
|
||||
);
|
||||
handle_metadata(&path, subcommand_matches)?;
|
||||
}
|
||||
None => {
|
||||
let path = arg_matches
|
||||
.get_one::<PathBuf>("path")
|
||||
.context("'path' argument is missing")?
|
||||
.to_path_buf();
|
||||
println!(
|
||||
"No subcommand specified, attempting to guess the format for file {}",
|
||||
path.display()
|
||||
);
|
||||
if let Err(e) = read_pg_control_file(&path) {
|
||||
println!(
|
||||
"Failed to read input file as a pg control one: {e:#}\n\
|
||||
Attempting to read it as layer file"
|
||||
);
|
||||
print_layerfile(&path)?;
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
|
||||
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
|
||||
println!("{control_file:?}");
|
||||
let control_file_initdb = Lsn(control_file.checkPoint);
|
||||
println!(
|
||||
"pg_initdb_lsn: {}, aligned: {}",
|
||||
control_file_initdb,
|
||||
control_file_initdb.align()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx)
|
||||
}
|
||||
|
||||
fn handle_metadata(path: &Path, arg_matches: &clap::ArgMatches) -> Result<(), anyhow::Error> {
|
||||
let metadata_bytes = std::fs::read(path)?;
|
||||
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
|
||||
println!("Current metadata:\n{meta:?}");
|
||||
let mut update_meta = false;
|
||||
if let Some(disk_consistent_lsn) = arg_matches.get_one::<String>("disk_consistent_lsn") {
|
||||
meta = TimelineMetadata::new(
|
||||
Lsn::from_str(disk_consistent_lsn)?,
|
||||
meta.prev_record_lsn(),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
if let Some(prev_record_lsn) = arg_matches.get_one::<String>("prev_record_lsn") {
|
||||
meta = TimelineMetadata::new(
|
||||
meta.disk_consistent_lsn(),
|
||||
Some(Lsn::from_str(prev_record_lsn)?),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
if let Some(latest_gc_cuttoff) = arg_matches.get_one::<String>("latest_gc_cuttoff") {
|
||||
meta = TimelineMetadata::new(
|
||||
meta.disk_consistent_lsn(),
|
||||
meta.prev_record_lsn(),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
Lsn::from_str(latest_gc_cuttoff)?,
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
|
||||
if update_meta {
|
||||
let metadata_bytes = meta.to_bytes()?;
|
||||
std::fs::write(path, metadata_bytes)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cli() -> Command {
|
||||
Command::new("Neon Pageserver binutils")
|
||||
.about("Reads pageserver (and related) binary files management utility")
|
||||
.version(GIT_VERSION)
|
||||
.arg(
|
||||
Arg::new("path")
|
||||
.help("Input file path")
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.required(false),
|
||||
)
|
||||
.subcommand(
|
||||
Command::new(METADATA_SUBCOMMAND)
|
||||
.about("Read and update pageserver metadata file")
|
||||
.arg(
|
||||
Arg::new("metadata_path")
|
||||
.help("Input metadata file path")
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("disk_consistent_lsn")
|
||||
.long("disk_consistent_lsn")
|
||||
.help("Replace disk consistent Lsn"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("prev_record_lsn")
|
||||
.long("prev_record_lsn")
|
||||
.help("Replace previous record Lsn"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("latest_gc_cuttoff")
|
||||
.long("latest_gc_cuttoff")
|
||||
.help("Replace latest gc cuttoff"),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_cli() {
|
||||
cli().debug_assert();
|
||||
}
|
||||
@@ -150,7 +150,7 @@ pub async fn collect_metrics_iteration(
|
||||
let mut tenant_resident_size = 0;
|
||||
|
||||
// iterate through list of timelines in tenant
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
for timeline in tenant.list_timelines().await.iter() {
|
||||
// collect per-timeline metrics only for active timelines
|
||||
if timeline.is_active() {
|
||||
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
|
||||
|
||||
@@ -88,7 +88,6 @@
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RequestContext {
|
||||
task_kind: TaskKind,
|
||||
download_behavior: DownloadBehavior,
|
||||
@@ -96,7 +95,7 @@ pub struct RequestContext {
|
||||
|
||||
/// Desired behavior if the operation requires an on-demand download
|
||||
/// to proceed.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||
#[derive(Clone, Copy, PartialEq, Eq)]
|
||||
pub enum DownloadBehavior {
|
||||
/// Download the layer file. It can take a while.
|
||||
Download,
|
||||
|
||||
@@ -54,13 +54,12 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, instrument, warn, Instrument};
|
||||
use utils::completion;
|
||||
use utils::serde_percent::Percent;
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{self, storage_layer::{PersistentLayer, RemoteLayerDesc}, Timeline},
|
||||
tenant::{self, storage_layer::PersistentLayer, Timeline},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -83,7 +82,6 @@ pub fn launch_disk_usage_global_eviction_task(
|
||||
conf: &'static PageServerConf,
|
||||
storage: GenericRemoteStorage,
|
||||
state: Arc<State>,
|
||||
init_done: completion::Barrier,
|
||||
) -> anyhow::Result<()> {
|
||||
let Some(task_config) = &conf.disk_usage_based_eviction else {
|
||||
info!("disk usage based eviction task not configured");
|
||||
@@ -100,9 +98,6 @@ pub fn launch_disk_usage_global_eviction_task(
|
||||
"disk usage based eviction",
|
||||
false,
|
||||
async move {
|
||||
// wait until initial load is complete, because we cannot evict from loading tenants.
|
||||
init_done.wait().await;
|
||||
|
||||
disk_usage_eviction_task(
|
||||
&state,
|
||||
task_config,
|
||||
@@ -329,7 +324,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
// If we get far enough in the list that we start to evict layers that are below
|
||||
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
|
||||
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
|
||||
let mut batched: HashMap<_, Vec<Arc<RemoteLayerDesc>>> = HashMap::new();
|
||||
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
|
||||
let mut warned = None;
|
||||
let mut usage_planned = usage_pre;
|
||||
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
|
||||
@@ -434,7 +429,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
#[derive(Clone)]
|
||||
struct EvictionCandidate {
|
||||
timeline: Arc<Timeline>,
|
||||
layer: Arc<RemoteLayerDesc>,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
last_activity_ts: SystemTime,
|
||||
}
|
||||
|
||||
@@ -513,7 +508,7 @@ async fn collect_eviction_candidates(
|
||||
// a little unfair to tenants during shutdown in such a situation is tolerable.
|
||||
let mut tenant_candidates = Vec::new();
|
||||
let mut max_layer_size = 0;
|
||||
for tl in tenant.list_timelines() {
|
||||
for tl in tenant.list_timelines().await {
|
||||
if !tl.is_active() {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -678,8 +678,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- new_timeline_id
|
||||
properties:
|
||||
new_timeline_id:
|
||||
type: string
|
||||
@@ -938,8 +936,6 @@ components:
|
||||
allOf:
|
||||
- $ref: '#/components/schemas/TenantConfig'
|
||||
- type: object
|
||||
required:
|
||||
- new_tenant_id
|
||||
properties:
|
||||
new_tenant_id:
|
||||
type: string
|
||||
|
||||
@@ -11,7 +11,7 @@ use storage_broker::BrokerClientChannel;
|
||||
use tenant_size_model::{SizeResult, StorageModel};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::http::endpoint::request_span;
|
||||
use utils::http::endpoint::RequestSpan;
|
||||
use utils::http::json::json_request_or_empty_body;
|
||||
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
|
||||
|
||||
@@ -25,9 +25,7 @@ use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::mgr::{
|
||||
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
|
||||
};
|
||||
use crate::tenant::mgr::{TenantMapInsertError, TenantStateError};
|
||||
use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
|
||||
@@ -146,36 +144,6 @@ impl From<TenantStateError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetTenantError> for ApiError {
|
||||
fn from(tse: GetTenantError) -> ApiError {
|
||||
match tse {
|
||||
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid)),
|
||||
e @ GetTenantError::NotActive(_) => {
|
||||
// Why is this not `ApiError::NotFound`?
|
||||
// Because we must be careful to never return 404 for a tenant if it does
|
||||
// in fact exist locally. If we did, the caller could draw the conclusion
|
||||
// that it can attach the tenant to another PS and we'd be in split-brain.
|
||||
//
|
||||
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
|
||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SetNewTenantConfigError> for ApiError {
|
||||
fn from(e: SetNewTenantConfigError) -> ApiError {
|
||||
match e {
|
||||
SetNewTenantConfigError::GetTenant(tid) => {
|
||||
ApiError::NotFound(anyhow!("tenant {}", tid))
|
||||
}
|
||||
e @ SetNewTenantConfigError::Persist(_) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::DeleteTimelineError> for ApiError {
|
||||
fn from(value: crate::tenant::DeleteTimelineError) -> Self {
|
||||
use crate::tenant::DeleteTimelineError::*;
|
||||
@@ -195,7 +163,7 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
|
||||
match value {
|
||||
// Report Precondition failed so client can distinguish between
|
||||
// "tenant is missing" case from "timeline is missing"
|
||||
Tenant(GetTenantError::NotFound(..)) => {
|
||||
Tenant(TenantStateError::NotFound(..)) => {
|
||||
ApiError::PreconditionFailed("Requested tenant is missing")
|
||||
}
|
||||
Tenant(t) => ApiError::from(t),
|
||||
@@ -301,7 +269,9 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let new_timeline_id = request_data.new_timeline_id;
|
||||
let new_timeline_id = request_data
|
||||
.new_timeline_id
|
||||
.unwrap_or_else(TimelineId::generate);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
|
||||
@@ -328,7 +298,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
Err(err) => Err(ApiError::InternalServerError(err)),
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("timeline_create", tenant = %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
|
||||
.instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -342,7 +312,7 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
|
||||
let response_data = async {
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
let timelines = tenant.list_timelines();
|
||||
let timelines = tenant.list_timelines().await;
|
||||
|
||||
let mut response_data = Vec::with_capacity(timelines.len());
|
||||
for timeline in timelines {
|
||||
@@ -381,6 +351,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, false)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let timeline_info = build_timeline_info(
|
||||
@@ -550,12 +521,12 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
|
||||
// Calculate total physical size of all timelines
|
||||
let mut current_physical_size = 0;
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
for timeline in tenant.list_timelines().await.iter() {
|
||||
current_physical_size += timeline.layer_size_sum();
|
||||
}
|
||||
|
||||
let state = tenant.current_state();
|
||||
Result::<_, ApiError>::Ok(TenantInfo {
|
||||
Ok(TenantInfo {
|
||||
id: tenant_id,
|
||||
state: state.clone(),
|
||||
current_physical_size: Some(current_physical_size),
|
||||
@@ -563,7 +534,8 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
})
|
||||
}
|
||||
.instrument(info_span!("tenant_status_handler", tenant = %tenant_id))
|
||||
.await?;
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, tenant_info)
|
||||
}
|
||||
@@ -762,8 +734,6 @@ pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>,
|
||||
}
|
||||
|
||||
async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
let target_tenant_id = request_data.new_tenant_id;
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let _timer = STORAGE_TIME_GLOBAL
|
||||
@@ -771,10 +741,17 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
.expect("bug")
|
||||
.start_timer();
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
|
||||
let tenant_conf =
|
||||
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let target_tenant_id = request_data
|
||||
.new_tenant_id
|
||||
.map(TenantId::from)
|
||||
.unwrap_or_else(TenantId::generate);
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
@@ -999,6 +976,7 @@ async fn active_timeline_of_active_tenant(
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)
|
||||
}
|
||||
|
||||
@@ -1179,7 +1157,7 @@ pub fn make_router(
|
||||
#[cfg(not(feature = "testing"))]
|
||||
let handler = cfg_disabled;
|
||||
|
||||
move |r| request_span(r, handler)
|
||||
move |r| RequestSpan(handler).handle(r)
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -1194,50 +1172,54 @@ pub fn make_router(
|
||||
)
|
||||
.context("Failed to initialize router state")?,
|
||||
))
|
||||
.get("/v1/status", |r| request_span(r, status_handler))
|
||||
.get("/v1/status", |r| RequestSpan(status_handler).handle(r))
|
||||
.put(
|
||||
"/v1/failpoints",
|
||||
testing_api!("manage failpoints", failpoints_handler),
|
||||
)
|
||||
.get("/v1/tenant", |r| request_span(r, tenant_list_handler))
|
||||
.post("/v1/tenant", |r| request_span(r, tenant_create_handler))
|
||||
.get("/v1/tenant/:tenant_id", |r| request_span(r, tenant_status))
|
||||
.get("/v1/tenant", |r| RequestSpan(tenant_list_handler).handle(r))
|
||||
.post("/v1/tenant", |r| {
|
||||
RequestSpan(tenant_create_handler).handle(r)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id", |r| {
|
||||
RequestSpan(tenant_status).handle(r)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
|
||||
request_span(r, tenant_size_handler)
|
||||
RequestSpan(tenant_size_handler).handle(r)
|
||||
})
|
||||
.put("/v1/tenant/config", |r| {
|
||||
request_span(r, update_tenant_config_handler)
|
||||
RequestSpan(update_tenant_config_handler).handle(r)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/config", |r| {
|
||||
request_span(r, get_tenant_config_handler)
|
||||
RequestSpan(get_tenant_config_handler).handle(r)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/timeline", |r| {
|
||||
request_span(r, timeline_list_handler)
|
||||
RequestSpan(timeline_list_handler).handle(r)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/timeline", |r| {
|
||||
request_span(r, timeline_create_handler)
|
||||
RequestSpan(timeline_create_handler).handle(r)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/attach", |r| {
|
||||
request_span(r, tenant_attach_handler)
|
||||
RequestSpan(tenant_attach_handler).handle(r)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/detach", |r| {
|
||||
request_span(r, tenant_detach_handler)
|
||||
RequestSpan(tenant_detach_handler).handle(r)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/load", |r| {
|
||||
request_span(r, tenant_load_handler)
|
||||
RequestSpan(tenant_load_handler).handle(r)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/ignore", |r| {
|
||||
request_span(r, tenant_ignore_handler)
|
||||
RequestSpan(tenant_ignore_handler).handle(r)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
request_span(r, timeline_detail_handler)
|
||||
RequestSpan(timeline_detail_handler).handle(r)
|
||||
})
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
|
||||
|r| request_span(r, get_lsn_by_timestamp_handler),
|
||||
|r| RequestSpan(get_lsn_by_timestamp_handler).handle(r),
|
||||
)
|
||||
.put("/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", |r| {
|
||||
request_span(r, timeline_gc_handler)
|
||||
RequestSpan(timeline_gc_handler).handle(r)
|
||||
})
|
||||
.put(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/compact",
|
||||
@@ -1249,34 +1231,34 @@ pub fn make_router(
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|
||||
|r| request_span(r, timeline_download_remote_layers_handler_post),
|
||||
|r| RequestSpan(timeline_download_remote_layers_handler_post).handle(r),
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|
||||
|r| request_span(r, timeline_download_remote_layers_handler_get),
|
||||
|r| RequestSpan(timeline_download_remote_layers_handler_get).handle(r),
|
||||
)
|
||||
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
request_span(r, timeline_delete_handler)
|
||||
RequestSpan(timeline_delete_handler).handle(r)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {
|
||||
request_span(r, layer_map_info_handler)
|
||||
RequestSpan(layer_map_info_handler).handle(r)
|
||||
})
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
|
||||
|r| request_span(r, layer_download_handler),
|
||||
|r| RequestSpan(layer_download_handler).handle(r),
|
||||
)
|
||||
.delete(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
|
||||
|r| request_span(r, evict_timeline_layer_handler),
|
||||
|r| RequestSpan(evict_timeline_layer_handler).handle(r),
|
||||
)
|
||||
.put("/v1/disk_usage_eviction/run", |r| {
|
||||
request_span(r, disk_usage_eviction_run)
|
||||
RequestSpan(disk_usage_eviction_run).handle(r)
|
||||
})
|
||||
.put(
|
||||
"/v1/tenant/:tenant_id/break",
|
||||
testing_api!("set tenant state to broken", handle_tenant_break),
|
||||
)
|
||||
.get("/v1/panic", |r| request_span(r, always_panic_handler))
|
||||
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
|
||||
.post(
|
||||
"/v1/tracing/event",
|
||||
testing_api!("emit a tracing event", post_tracing_event_handler),
|
||||
|
||||
@@ -35,7 +35,7 @@ use tracing::info;
|
||||
/// backwards-compatible changes to the metadata format.
|
||||
pub const STORAGE_FORMAT_VERSION: u16 = 3;
|
||||
|
||||
pub const DEFAULT_PG_VERSION: u32 = 15;
|
||||
pub const DEFAULT_PG_VERSION: u32 = 14;
|
||||
|
||||
// Magic constants used to identify different kinds of files
|
||||
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
|
||||
@@ -45,7 +45,6 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
pub use crate::metrics::preinitialize_metrics;
|
||||
|
||||
#[tracing::instrument]
|
||||
pub async fn shutdown_pageserver(exit_code: i32) {
|
||||
// Shut down the libpq endpoint task. This prevents new connections from
|
||||
// being accepted.
|
||||
|
||||
@@ -50,9 +50,7 @@ use crate::import_datadir::import_wal_from_tar;
|
||||
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant;
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::GetTenantError;
|
||||
use crate::tenant::{Tenant, Timeline};
|
||||
use crate::trace::Tracer;
|
||||
|
||||
@@ -390,7 +388,7 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true).await?;
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
@@ -489,7 +487,9 @@ impl PageServerHandler {
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.await?;
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
// We might have some wal to import as well, and we should prevent compute
|
||||
@@ -1150,9 +1150,7 @@ enum GetActiveTenantError {
|
||||
wait_time: Duration,
|
||||
},
|
||||
#[error(transparent)]
|
||||
NotFound(GetTenantError),
|
||||
#[error(transparent)]
|
||||
WaitTenantActive(tenant::WaitToBecomeActiveError),
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<GetActiveTenantError> for QueryError {
|
||||
@@ -1161,8 +1159,7 @@ impl From<GetActiveTenantError> for QueryError {
|
||||
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
|
||||
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
|
||||
),
|
||||
GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)),
|
||||
GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)),
|
||||
GetActiveTenantError::Other(e) => QueryError::Other(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1178,16 +1175,13 @@ async fn get_active_tenant_with_timeout(
|
||||
) -> Result<Arc<Tenant>, GetActiveTenantError> {
|
||||
let tenant = match mgr::get_tenant(tenant_id, false).await {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
|
||||
Err(GetTenantError::NotActive(_)) => {
|
||||
unreachable!("we're calling get_tenant with active=false")
|
||||
}
|
||||
Err(e) => return Err(GetActiveTenantError::Other(e.into())),
|
||||
};
|
||||
let wait_time = Duration::from_secs(30);
|
||||
match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
|
||||
Ok(Ok(())) => Ok(tenant),
|
||||
// no .context(), the error message is good enough and some tests depend on it
|
||||
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
|
||||
Ok(Err(wait_error)) => Err(GetActiveTenantError::Other(wait_error)),
|
||||
Err(_) => {
|
||||
let latest_state = tenant.current_state();
|
||||
if latest_state == TenantState::Active {
|
||||
@@ -1202,34 +1196,13 @@ async fn get_active_tenant_with_timeout(
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum GetActiveTimelineError {
|
||||
#[error(transparent)]
|
||||
Tenant(GetActiveTenantError),
|
||||
#[error(transparent)]
|
||||
Timeline(anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<GetActiveTimelineError> for QueryError {
|
||||
fn from(e: GetActiveTimelineError) -> Self {
|
||||
match e {
|
||||
GetActiveTimelineError::Tenant(e) => e.into(),
|
||||
GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shorthand for getting a reference to a Timeline of an Active tenant.
|
||||
async fn get_active_tenant_timeline(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
|
||||
.await
|
||||
.map_err(GetActiveTimelineError::Tenant)?;
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(GetActiveTimelineError::Timeline)?;
|
||||
) -> Result<Arc<Timeline>, GetActiveTenantError> {
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, ctx).await?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true).await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -1594,13 +1594,15 @@ fn is_slru_block_key(key: Key) -> bool {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn create_test_timeline(
|
||||
pub async fn create_test_timeline(
|
||||
tenant: &crate::tenant::Tenant,
|
||||
timeline_id: utils::id::TimelineId,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)
|
||||
.await?;
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit()?;
|
||||
@@ -1630,7 +1632,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_list_rels_drop() -> Result<()> {
|
||||
let repo = RepoHarness::create("test_list_rels_drop")?.load();
|
||||
let tline = create_empty_timeline(repo, TIMELINE_ID)?;
|
||||
let tline = create_empty_timeline(repo, TIMELINE_ID).await?;
|
||||
const TESTDB: u32 = 111;
|
||||
|
||||
// Import initial dummy checkpoint record, otherwise the get_timeline() call
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,34 +0,0 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use super::storage_layer::{LayerFileName, PersistentLayer, RemoteLayerDesc};
|
||||
|
||||
pub struct LayerCache {
|
||||
layers: Mutex<HashMap<LayerFileName, Arc<dyn PersistentLayer>>>,
|
||||
}
|
||||
|
||||
impl LayerCache {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
layers: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self, layer_fname: &LayerFileName) -> Option<Arc<dyn PersistentLayer>> {
|
||||
let guard: std::sync::MutexGuard<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> =
|
||||
self.layers.lock().unwrap();
|
||||
guard.get(layer_fname).cloned()
|
||||
}
|
||||
|
||||
pub fn contains(&self, layer_fname: &LayerFileName) -> bool {
|
||||
let guard = self.layers.lock().unwrap();
|
||||
guard.contains_key(layer_fname)
|
||||
}
|
||||
|
||||
pub fn insert(&self, layer_fname: LayerFileName, persistent_layer: Arc<dyn PersistentLayer>) {
|
||||
let mut guard = self.layers.lock().unwrap();
|
||||
guard.insert(layer_fname, persistent_layer);
|
||||
}
|
||||
}
|
||||
@@ -61,7 +61,6 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
pub use historic_layer_coverage::Replacement;
|
||||
|
||||
use super::storage_layer::range_eq;
|
||||
use super::storage_layer::PersistentLayer;
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
@@ -139,19 +138,24 @@ where
|
||||
self.layer_map.remove_historic_noflush(layer)
|
||||
}
|
||||
|
||||
/// Ensure the downloaded layer matches existing layer.
|
||||
/// Replaces existing layer iff it is the `expected`.
|
||||
///
|
||||
/// Returned `Replacement` describes succeeding in checking or the reason why it could not
|
||||
/// If the expected layer has been removed it will not be inserted by this function.
|
||||
///
|
||||
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
|
||||
/// be done.
|
||||
pub fn ensure_consistent(
|
||||
&self,
|
||||
///
|
||||
/// TODO replacement can be done without buffering and rebuilding layer map updates.
|
||||
/// One way to do that is to add a layer of indirection for returned values, so
|
||||
/// that we can replace values only by updating a hashmap.
|
||||
pub fn replace_historic(
|
||||
&mut self,
|
||||
expected: &Arc<L>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
new: Arc<L>,
|
||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
|
||||
|
||||
self.layer_map
|
||||
.ensure_consistent_noflush(expected, new)
|
||||
self.layer_map.replace_historic_noflush(expected, new)
|
||||
}
|
||||
|
||||
// We will flush on drop anyway, but this method makes it
|
||||
@@ -305,16 +309,16 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub(self) fn ensure_consistent_noflush(
|
||||
&self,
|
||||
pub(self) fn replace_historic_noflush(
|
||||
&mut self,
|
||||
expected: &Arc<L>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
new: Arc<L>,
|
||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||
let key = historic_layer_coverage::LayerKey::from(&**expected);
|
||||
let other = historic_layer_coverage::LayerKey::from(&*new);
|
||||
|
||||
let expected_l0 = Self::is_l0(expected);
|
||||
let new_l0 = LayerMap::<dyn PersistentLayer>::is_l0(&*new);
|
||||
let new_l0 = Self::is_l0(&new);
|
||||
|
||||
anyhow::ensure!(
|
||||
key == other,
|
||||
@@ -341,7 +345,17 @@ where
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Replacement::Replaced { in_buffered: false })
|
||||
let replaced = self.historic.replace(&key, new.clone(), |existing| {
|
||||
Self::compare_arced_layers(existing, expected)
|
||||
});
|
||||
|
||||
if let Replacement::Replaced { .. } = &replaced {
|
||||
if let Some(index) = l0_index {
|
||||
self.l0_delta_layers[index] = new;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(replaced)
|
||||
}
|
||||
|
||||
/// Helper function for BatchedUpdates::drop.
|
||||
|
||||
@@ -10,7 +10,6 @@ use tokio::fs;
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::*;
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
@@ -20,12 +19,9 @@ use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{
|
||||
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
|
||||
};
|
||||
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
|
||||
use crate::IGNORED_TENANT_FILE_NAME;
|
||||
|
||||
use utils::completion;
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
@@ -67,7 +63,6 @@ pub async fn init_tenant_mgr(
|
||||
conf: &'static PageServerConf,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
init_done: (completion::Completion, completion::Barrier),
|
||||
) -> anyhow::Result<()> {
|
||||
// Scan local filesystem for attached tenants
|
||||
let tenants_dir = conf.tenants_path();
|
||||
@@ -124,7 +119,6 @@ pub async fn init_tenant_mgr(
|
||||
&tenant_dir_path,
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
Some(init_done.clone()),
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
@@ -160,7 +154,6 @@ pub fn schedule_local_tenant_processing(
|
||||
tenant_path: &Path,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
init_done: Option<(completion::Completion, completion::Barrier)>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
anyhow::ensure!(
|
||||
@@ -214,14 +207,7 @@ pub fn schedule_local_tenant_processing(
|
||||
} else {
|
||||
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
|
||||
// Start loading the tenant into memory. It will initially be in Loading state.
|
||||
Tenant::spawn_load(
|
||||
conf,
|
||||
tenant_id,
|
||||
broker_client,
|
||||
remote_storage,
|
||||
init_done,
|
||||
ctx,
|
||||
)
|
||||
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
|
||||
};
|
||||
Ok(tenant)
|
||||
}
|
||||
@@ -236,7 +222,6 @@ pub fn schedule_local_tenant_processing(
|
||||
/// That could be easily misinterpreted by control plane, the consumer of the
|
||||
/// management API. For example, it could attach the tenant on a different pageserver.
|
||||
/// We would then be in split-brain once this pageserver restarts.
|
||||
#[instrument]
|
||||
pub async fn shutdown_all_tenants() {
|
||||
// Prevent new tenants from being created.
|
||||
let tenants_to_shut_down = {
|
||||
@@ -259,65 +244,15 @@ pub async fn shutdown_all_tenants() {
|
||||
}
|
||||
};
|
||||
|
||||
// Set tenant (and its timlines) to Stoppping state.
|
||||
//
|
||||
// Since we can only transition into Stopping state after activation is complete,
|
||||
// run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
|
||||
//
|
||||
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
|
||||
// 1. Lock out any new requests to the tenants.
|
||||
// 2. Signal cancellation to WAL receivers (we wait on it below).
|
||||
// 3. Signal cancellation for other tenant background loops.
|
||||
// 4. ???
|
||||
//
|
||||
// The waiting for the cancellation is not done uniformly.
|
||||
// We certainly wait for WAL receivers to shut down.
|
||||
// That is necessary so that no new data comes in before the freeze_and_flush.
|
||||
// But the tenant background loops are joined-on in our caller.
|
||||
// It's mesed up.
|
||||
let mut join_set = JoinSet::new();
|
||||
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
|
||||
for (tenant_id, tenant) in tenants_to_shut_down {
|
||||
join_set.spawn(
|
||||
async move {
|
||||
match tenant.set_stopping().await {
|
||||
Ok(()) => debug!("tenant successfully stopped"),
|
||||
Err(SetStoppingError::Broken) => {
|
||||
info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well");
|
||||
},
|
||||
Err(SetStoppingError::AlreadyStopping) => {
|
||||
// our task_mgr::shutdown_tasks are going to coalesce on that just fine
|
||||
}
|
||||
}
|
||||
|
||||
tenant
|
||||
}
|
||||
.instrument(info_span!("set_stopping", %tenant_id)),
|
||||
);
|
||||
}
|
||||
|
||||
let mut panicked = 0;
|
||||
|
||||
while let Some(res) = join_set.join_next().await {
|
||||
match res {
|
||||
Err(join_error) if join_error.is_cancelled() => {
|
||||
unreachable!("we are not cancelling any of the futures");
|
||||
}
|
||||
Err(join_error) if join_error.is_panic() => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
panicked += 1;
|
||||
}
|
||||
Err(join_error) => {
|
||||
warn!("unknown kind of JoinError: {join_error}");
|
||||
}
|
||||
Ok(tenant) => tenants_to_freeze_and_flush.push(tenant),
|
||||
for (_, tenant) in tenants_to_shut_down {
|
||||
if tenant.is_active() {
|
||||
// updates tenant state, forbidding new GC and compaction iterations from starting
|
||||
tenant.set_stopping().await;
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
}
|
||||
}
|
||||
|
||||
if panicked > 0 {
|
||||
warn!(panicked, "observed panicks while stopping tenants");
|
||||
}
|
||||
|
||||
// Shut down all existing walreceiver connections and stop accepting the new ones.
|
||||
task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await;
|
||||
|
||||
@@ -329,30 +264,12 @@ pub async fn shutdown_all_tenants() {
|
||||
// should be no more activity in any of the repositories.
|
||||
//
|
||||
// On error, log it but continue with the shutdown for other tenants.
|
||||
|
||||
let mut join_set = tokio::task::JoinSet::new();
|
||||
|
||||
for tenant in tenants_to_freeze_and_flush {
|
||||
let tenant_id = tenant.tenant_id();
|
||||
debug!("shutdown tenant {tenant_id}");
|
||||
|
||||
join_set.spawn(
|
||||
async move {
|
||||
if let Err(err) = tenant.freeze_and_flush().await {
|
||||
warn!("Could not checkpoint tenant during shutdown: {err:?}");
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("freeze_and_flush", %tenant_id)),
|
||||
);
|
||||
}
|
||||
|
||||
while let Some(next) = join_set.join_next().await {
|
||||
match next {
|
||||
Ok(()) => {}
|
||||
Err(join_error) if join_error.is_cancelled() => {
|
||||
unreachable!("no cancelling")
|
||||
}
|
||||
Err(join_error) if join_error.is_panic() => { /* reported already */ }
|
||||
Err(join_error) => warn!("unknown kind of JoinError: {join_error}"),
|
||||
if let Err(err) = tenant.freeze_and_flush().await {
|
||||
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -374,7 +291,7 @@ pub async fn create_tenant(
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let created_tenant =
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
@@ -387,19 +304,11 @@ pub async fn create_tenant(
|
||||
}).await
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum SetNewTenantConfigError {
|
||||
#[error(transparent)]
|
||||
GetTenant(#[from] GetTenantError),
|
||||
#[error(transparent)]
|
||||
Persist(anyhow::Error),
|
||||
}
|
||||
|
||||
pub async fn set_new_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
new_tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), SetNewTenantConfigError> {
|
||||
) -> Result<(), TenantStateError> {
|
||||
info!("configuring tenant {tenant_id}");
|
||||
let tenant = get_tenant(tenant_id, true).await?;
|
||||
|
||||
@@ -409,32 +318,23 @@ pub async fn set_new_tenant_config(
|
||||
&tenant_config_path,
|
||||
new_tenant_conf,
|
||||
false,
|
||||
)
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
tenant.set_new_tenant_config(new_tenant_conf);
|
||||
)?;
|
||||
tenant.set_new_tenant_config(new_tenant_conf).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum GetTenantError {
|
||||
#[error("Tenant {0} not found")]
|
||||
NotFound(TenantId),
|
||||
#[error("Tenant {0} is not active")]
|
||||
NotActive(TenantId),
|
||||
}
|
||||
|
||||
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
|
||||
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
|
||||
pub async fn get_tenant(
|
||||
tenant_id: TenantId,
|
||||
active_only: bool,
|
||||
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||
) -> Result<Arc<Tenant>, TenantStateError> {
|
||||
let m = TENANTS.read().await;
|
||||
let tenant = m
|
||||
.get(&tenant_id)
|
||||
.ok_or(GetTenantError::NotFound(tenant_id))?;
|
||||
.ok_or(TenantStateError::NotFound(tenant_id))?;
|
||||
if active_only && !tenant.is_active() {
|
||||
Err(GetTenantError::NotActive(tenant_id))
|
||||
Err(TenantStateError::NotActive(tenant_id))
|
||||
} else {
|
||||
Ok(Arc::clone(tenant))
|
||||
}
|
||||
@@ -443,7 +343,7 @@ pub async fn get_tenant(
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("Tenant {0}")]
|
||||
Tenant(#[from] GetTenantError),
|
||||
Tenant(#[from] TenantStateError),
|
||||
|
||||
#[error("Timeline {0}")]
|
||||
Timeline(#[from] crate::tenant::DeleteTimelineError),
|
||||
@@ -520,7 +420,7 @@ pub async fn load_tenant(
|
||||
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
|
||||
}
|
||||
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx)
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
|
||||
.with_context(|| {
|
||||
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
||||
})?;
|
||||
@@ -593,7 +493,7 @@ pub async fn attach_tenant(
|
||||
.context("check for attach marker file existence")?;
|
||||
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
|
||||
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
@@ -672,23 +572,14 @@ where
|
||||
{
|
||||
let tenants_accessor = TENANTS.write().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => {
|
||||
let tenant = Arc::clone(tenant);
|
||||
// don't hold TENANTS lock while set_stopping waits for activation to finish
|
||||
drop(tenants_accessor);
|
||||
match tenant.set_stopping().await {
|
||||
Ok(()) => {
|
||||
// we won, continue stopping procedure
|
||||
}
|
||||
Err(SetStoppingError::Broken) => {
|
||||
// continue the procedure, let's hope the closure can deal with broken tenants
|
||||
}
|
||||
Err(SetStoppingError::AlreadyStopping) => {
|
||||
// the tenant is already stopping or broken, don't do anything
|
||||
return Err(TenantStateError::IsStopping(tenant_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(tenant) => match tenant.current_state() {
|
||||
TenantState::Attaching
|
||||
| TenantState::Loading
|
||||
| TenantState::Activating
|
||||
| TenantState::Broken { .. }
|
||||
| TenantState::Active => tenant.set_stopping().await,
|
||||
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
|
||||
},
|
||||
None => return Err(TenantStateError::NotFound(tenant_id)),
|
||||
}
|
||||
}
|
||||
@@ -795,6 +686,7 @@ pub async fn immediate_compact(
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
|
||||
@@ -19,8 +19,14 @@ fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
|
||||
// TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
|
||||
pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
|
||||
const PARALLEL_PATH_THRESHOLD: usize = 1;
|
||||
if paths.len() <= PARALLEL_PATH_THRESHOLD {
|
||||
for path in paths {
|
||||
fsync_path(path)?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
/// Use at most this number of threads.
|
||||
/// Increasing this limit will
|
||||
@@ -30,11 +36,11 @@ fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
|
||||
let num_threads = paths.len().min(MAX_NUM_THREADS);
|
||||
let next_path_idx = AtomicUsize::new(0);
|
||||
|
||||
std::thread::scope(|s| -> io::Result<()> {
|
||||
crossbeam_utils::thread::scope(|s| -> io::Result<()> {
|
||||
let mut handles = vec![];
|
||||
// Spawn `num_threads - 1`, as the current thread is also a worker.
|
||||
for _ in 1..num_threads {
|
||||
handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
|
||||
handles.push(s.spawn(|_| parallel_worker(paths, &next_path_idx)));
|
||||
}
|
||||
|
||||
parallel_worker(paths, &next_path_idx)?;
|
||||
@@ -45,41 +51,5 @@ fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
|
||||
pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
|
||||
if paths.len() == 1 {
|
||||
fsync_path(&paths[0])?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
fsync_in_thread_pool(paths)
|
||||
}
|
||||
|
||||
/// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current
|
||||
/// execution thread. Otherwise, we will spawn_blocking and run it in tokio.
|
||||
pub async fn par_fsync_async(paths: &[PathBuf]) -> io::Result<()> {
|
||||
const MAX_CONCURRENT_FSYNC: usize = 64;
|
||||
let mut next = paths.iter().peekable();
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
loop {
|
||||
while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
|
||||
let next = next.next().expect("just peeked");
|
||||
let next = next.to_owned();
|
||||
js.spawn_blocking(move || fsync_path(&next));
|
||||
}
|
||||
|
||||
// now the joinset has been filled up, wait for next to complete
|
||||
if let Some(res) = js.join_next().await {
|
||||
res??;
|
||||
} else {
|
||||
// last item had already completed
|
||||
assert!(
|
||||
next.peek().is_none(),
|
||||
"joinset emptied, we shouldn't have more work"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -1264,7 +1264,12 @@ mod tests {
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = runtime.block_on(tenant.create_test_timeline(
|
||||
TIMELINE_ID,
|
||||
Lsn(0),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
))?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
|
||||
@@ -136,7 +136,7 @@ pub(super) async fn gather_inputs(
|
||||
.context("Failed to refresh gc_info before gathering inputs")?;
|
||||
|
||||
// Collect information about all the timelines
|
||||
let mut timelines = tenant.list_timelines();
|
||||
let mut timelines = tenant.list_timelines().await;
|
||||
|
||||
if timelines.is_empty() {
|
||||
// perhaps the tenant has just been created, and as such doesn't have any data yet
|
||||
|
||||
@@ -37,7 +37,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
|
||||
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
pub use image_layer::{ImageLayer, ImageLayerWriter};
|
||||
pub use inmemory_layer::InMemoryLayer;
|
||||
pub use remote_layer::RemoteLayerDesc;
|
||||
pub use remote_layer::RemoteLayer;
|
||||
|
||||
use super::layer_map::BatchedUpdates;
|
||||
|
||||
@@ -431,6 +431,14 @@ pub trait PersistentLayer: Layer {
|
||||
/// Permanently remove this layer from disk.
|
||||
fn delete_resident_layer_file(&self) -> Result<()>;
|
||||
|
||||
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
None
|
||||
}
|
||||
|
||||
fn is_remote_layer(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns None if the layer file size is not known.
|
||||
///
|
||||
/// Should not change over the lifetime of the layer object because
|
||||
@@ -442,6 +450,16 @@ pub trait PersistentLayer: Layer {
|
||||
fn access_stats(&self) -> &LayerAccessStats;
|
||||
}
|
||||
|
||||
pub fn downcast_remote_layer(
|
||||
layer: &Arc<dyn PersistentLayer>,
|
||||
) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
if layer.is_remote_layer() {
|
||||
Arc::clone(layer).downcast_remote_layer()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds metadata about a layer without any content. Used mostly for testing.
|
||||
///
|
||||
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
|
||||
@@ -524,7 +542,7 @@ impl From<LayerFileName> for LayerDescriptor {
|
||||
///
|
||||
/// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
|
||||
/// global config, and paths to layer files are constructed using the tenant/timeline
|
||||
/// path from the config. But in the 'pagectl' binary, we need to construct a Layer
|
||||
/// path from the config. But in the 'pageserver_binutils' binary, we need to construct a Layer
|
||||
/// struct for a file on disk, without having a page server running, so that we have no
|
||||
/// config. In that case, we use the Path variant to hold the full path to the file on
|
||||
/// disk.
|
||||
|
||||
@@ -30,7 +30,6 @@ use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{
|
||||
PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
@@ -58,7 +57,7 @@ use utils::{
|
||||
|
||||
use super::{
|
||||
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
|
||||
LayerKeyIter, PathOrConf, RemoteLayerDesc,
|
||||
LayerKeyIter, PathOrConf,
|
||||
};
|
||||
|
||||
///
|
||||
@@ -111,7 +110,7 @@ const WILL_INIT: u64 = 1;
|
||||
/// reading/deserializing records themselves.
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||
pub struct BlobRef(pub u64);
|
||||
struct BlobRef(u64);
|
||||
|
||||
impl BlobRef {
|
||||
pub fn will_init(&self) -> bool {
|
||||
@@ -620,7 +619,7 @@ impl DeltaLayer {
|
||||
|
||||
/// Create a DeltaLayer struct representing an existing file on disk.
|
||||
///
|
||||
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
|
||||
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
|
||||
pub fn new_for_path(path: &Path, file: File) -> Result<Self> {
|
||||
let mut summary_buf = Vec::new();
|
||||
summary_buf.resize(PAGE_SZ, 0);
|
||||
@@ -664,17 +663,6 @@ impl DeltaLayer {
|
||||
&self.layer_name(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Create layer descriptor for this image layer
|
||||
pub fn layer_desc(&self) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&self.layer_name(),
|
||||
&LayerFileMetadata::new(self.file_size()),
|
||||
LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new delta layer.
|
||||
|
||||
@@ -26,7 +26,6 @@ use crate::repository::{Key, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{
|
||||
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
@@ -54,7 +53,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::filename::{ImageFileName, LayerFileName};
|
||||
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, RemoteLayerDesc};
|
||||
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -423,7 +422,7 @@ impl ImageLayer {
|
||||
|
||||
/// Create an ImageLayer struct representing an existing file on disk.
|
||||
///
|
||||
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
|
||||
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
|
||||
pub fn new_for_path(path: &Path, file: File) -> Result<ImageLayer> {
|
||||
let mut summary_buf = Vec::new();
|
||||
summary_buf.resize(PAGE_SZ, 0);
|
||||
@@ -465,17 +464,6 @@ impl ImageLayer {
|
||||
&self.layer_name(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Create layer descriptor for this image layer
|
||||
pub fn layer_desc(&self) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&self.layer_name(),
|
||||
&LayerFileMetadata::new(self.file_size()),
|
||||
LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new image layer.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//! A RemoteLayerDesc is an in-memory placeholder for a layer file that exists
|
||||
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
|
||||
//! in remote storage.
|
||||
//!
|
||||
use crate::config::PageServerConf;
|
||||
@@ -25,19 +25,19 @@ use super::{
|
||||
LayerResidenceStatus, PersistentLayer,
|
||||
};
|
||||
|
||||
/// RemoteLayerDesc is a not yet downloaded [`ImageLayer`] or
|
||||
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
|
||||
/// [`crate::storage_layer::DeltaLayer`].
|
||||
///
|
||||
/// RemoteLayerDesc might be downloaded on-demand during operations which are
|
||||
/// RemoteLayer might be downloaded on-demand during operations which are
|
||||
/// allowed download remote layers and during which, it gets replaced with a
|
||||
/// concrete `DeltaLayer` or `ImageLayer`.
|
||||
///
|
||||
/// See: [`crate::context::RequestContext`] for authorization to download
|
||||
pub struct RemoteLayerDesc {
|
||||
pub(crate) tenantid: TenantId,
|
||||
pub(crate) timelineid: TimelineId,
|
||||
pub(crate) key_range: Range<Key>,
|
||||
pub(crate) lsn_range: Range<Lsn>,
|
||||
pub struct RemoteLayer {
|
||||
tenantid: TenantId,
|
||||
timelineid: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
pub file_name: LayerFileName,
|
||||
|
||||
@@ -54,7 +54,7 @@ pub struct RemoteLayerDesc {
|
||||
/// Has `LayerMap::replace` failed for this (true) or not (false).
|
||||
///
|
||||
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
|
||||
/// The field is used to mark a RemoteLayerDesc permanently (until restart or ignore+load)
|
||||
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
|
||||
/// unprocessable, because a LayerMap::replace failed.
|
||||
///
|
||||
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
|
||||
@@ -63,9 +63,9 @@ pub struct RemoteLayerDesc {
|
||||
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RemoteLayerDesc {
|
||||
impl std::fmt::Debug for RemoteLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RemoteLayerDesc")
|
||||
f.debug_struct("RemoteLayer")
|
||||
.field("file_name", &self.file_name)
|
||||
.field("layer_metadata", &self.layer_metadata)
|
||||
.field("is_incremental", &self.is_incremental)
|
||||
@@ -73,7 +73,7 @@ impl std::fmt::Debug for RemoteLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for RemoteLayerDesc {
|
||||
impl Layer for RemoteLayer {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
}
|
||||
@@ -119,7 +119,7 @@ impl Layer for RemoteLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for RemoteLayerDesc {
|
||||
impl PersistentLayer for RemoteLayer {
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
self.tenantid
|
||||
}
|
||||
@@ -160,6 +160,14 @@ impl PersistentLayer for RemoteLayerDesc {
|
||||
bail!("remote layer has no layer file");
|
||||
}
|
||||
|
||||
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
Some(self)
|
||||
}
|
||||
|
||||
fn is_remote_layer(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn file_size(&self) -> u64 {
|
||||
self.layer_metadata.file_size()
|
||||
}
|
||||
@@ -193,15 +201,15 @@ impl PersistentLayer for RemoteLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteLayerDesc {
|
||||
impl RemoteLayer {
|
||||
pub fn new_img(
|
||||
tenantid: TenantId,
|
||||
timelineid: TimelineId,
|
||||
fname: &ImageFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc {
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
tenantid,
|
||||
timelineid,
|
||||
key_range: fname.key_range.clone(),
|
||||
@@ -222,8 +230,8 @@ impl RemoteLayerDesc {
|
||||
fname: &DeltaFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc {
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
tenantid,
|
||||
timelineid,
|
||||
key_range: fname.key_range.clone(),
|
||||
|
||||
@@ -9,13 +9,13 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::TENANT_TASK_EVENTS;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::completion;
|
||||
use utils::id::TenantId;
|
||||
|
||||
pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completion::Barrier>) {
|
||||
let tenant_id = tenant.tenant_id;
|
||||
pub fn start_background_loops(tenant_id: TenantId) {
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::Compaction,
|
||||
@@ -23,16 +23,11 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
|
||||
None,
|
||||
&format!("compactor for tenant {tenant_id}"),
|
||||
false,
|
||||
{
|
||||
let tenant = Arc::clone(tenant);
|
||||
let init_done = init_done.cloned();
|
||||
async move {
|
||||
completion::Barrier::maybe_wait(init_done).await;
|
||||
compaction_loop(tenant)
|
||||
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
async move {
|
||||
compaction_loop(tenant_id)
|
||||
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
|
||||
.await;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
task_mgr::spawn(
|
||||
@@ -42,16 +37,11 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
|
||||
None,
|
||||
&format!("garbage collector for tenant {tenant_id}"),
|
||||
false,
|
||||
{
|
||||
let tenant = Arc::clone(tenant);
|
||||
let init_done = init_done.cloned();
|
||||
async move {
|
||||
completion::Barrier::maybe_wait(init_done).await;
|
||||
gc_loop(tenant)
|
||||
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
async move {
|
||||
gc_loop(tenant_id)
|
||||
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
|
||||
.await;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -59,7 +49,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
|
||||
///
|
||||
/// Compaction task's main loop
|
||||
///
|
||||
async fn compaction_loop(tenant: Arc<Tenant>) {
|
||||
async fn compaction_loop(tenant_id: TenantId) {
|
||||
let wait_duration = Duration::from_secs(2);
|
||||
info!("starting");
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
@@ -70,16 +60,16 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
tokio::select! {
|
||||
let tenant = tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request");
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
|
||||
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
|
||||
ControlFlow::Break(()) => return,
|
||||
ControlFlow::Continue(()) => (),
|
||||
ControlFlow::Continue(tenant) => tenant,
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
let period = tenant.get_compaction_period();
|
||||
|
||||
@@ -129,7 +119,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
|
||||
///
|
||||
/// GC task's main loop
|
||||
///
|
||||
async fn gc_loop(tenant: Arc<Tenant>) {
|
||||
async fn gc_loop(tenant_id: TenantId) {
|
||||
let wait_duration = Duration::from_secs(2);
|
||||
info!("starting");
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
@@ -137,22 +127,21 @@ async fn gc_loop(tenant: Arc<Tenant>) {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
// GC might require downloading, to find the cutoff LSN that corresponds to the
|
||||
// cutoff specified as time.
|
||||
let ctx =
|
||||
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
let mut first = true;
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
tokio::select! {
|
||||
let tenant = tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request");
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
|
||||
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
|
||||
ControlFlow::Break(()) => return,
|
||||
ControlFlow::Continue(()) => (),
|
||||
ControlFlow::Continue(tenant) => tenant,
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
let period = tenant.get_gc_period();
|
||||
|
||||
@@ -172,9 +161,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
// Run gc
|
||||
let res = tenant
|
||||
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
|
||||
.await;
|
||||
let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await;
|
||||
if let Err(e) = res {
|
||||
error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
|
||||
wait_duration
|
||||
@@ -200,10 +187,23 @@ async fn gc_loop(tenant: Arc<Tenant>) {
|
||||
trace!("GC loop stopped.");
|
||||
}
|
||||
|
||||
async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
|
||||
async fn wait_for_active_tenant(
|
||||
tenant_id: TenantId,
|
||||
wait: Duration,
|
||||
) -> ControlFlow<(), Arc<Tenant>> {
|
||||
let tenant = loop {
|
||||
match mgr::get_tenant(tenant_id, false).await {
|
||||
Ok(tenant) => break tenant,
|
||||
Err(e) => {
|
||||
error!("Failed to get a tenant {tenant_id}: {e:#}");
|
||||
tokio::time::sleep(wait).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// if the tenant has a proper status already, no need to wait for anything
|
||||
if tenant.current_state() == TenantState::Active {
|
||||
ControlFlow::Continue(())
|
||||
ControlFlow::Continue(tenant)
|
||||
} else {
|
||||
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
|
||||
loop {
|
||||
@@ -213,7 +213,7 @@ async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
|
||||
match new_state {
|
||||
TenantState::Active => {
|
||||
debug!("Tenant state changed to active, continuing the task loop");
|
||||
return ControlFlow::Continue(());
|
||||
return ControlFlow::Continue(tenant);
|
||||
}
|
||||
state => {
|
||||
debug!("Not running the task loop, tenant is not active: {state:?}");
|
||||
|
||||
@@ -35,7 +35,7 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
|
||||
use crate::tenant::storage_layer::{
|
||||
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
|
||||
LayerAccessStats, LayerFileName, RemoteLayerDesc,
|
||||
LayerAccessStats, LayerFileName, RemoteLayer,
|
||||
};
|
||||
use crate::tenant::{
|
||||
ephemeral_file::is_ephemeral_file,
|
||||
@@ -77,7 +77,6 @@ use self::eviction_task::EvictionTaskTimelineState;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::layer_cache::LayerCache;
|
||||
use super::layer_map::BatchedUpdates;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
@@ -120,7 +119,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub(super) layers: RwLock<LayerMap<RemoteLayerDesc>>,
|
||||
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
@@ -196,9 +195,8 @@ pub struct Timeline {
|
||||
/// Layer removal lock.
|
||||
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
|
||||
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
|
||||
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
|
||||
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
|
||||
pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
|
||||
/// and [`Tenant::delete_timeline`].
|
||||
pub(super) layer_removal_cs: tokio::sync::Mutex<()>,
|
||||
|
||||
// Needed to ensure that we can't create a branch at a point that was already garbage collected
|
||||
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
|
||||
@@ -237,13 +235,7 @@ pub struct Timeline {
|
||||
|
||||
state: watch::Sender<TimelineState>,
|
||||
|
||||
/// Prevent two tasks from deleting the timeline at the same time. If held, the
|
||||
/// timeline is being deleted. If 'true', the timeline has already been deleted.
|
||||
pub delete_lock: tokio::sync::Mutex<bool>,
|
||||
|
||||
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
||||
|
||||
layer_cache: Arc<LayerCache>,
|
||||
}
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
@@ -630,7 +622,7 @@ impl Timeline {
|
||||
{
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => {
|
||||
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
|
||||
// walreceiver.status() locks internally, don't count that towards the wait_lsn_time_histo
|
||||
drop(_timer);
|
||||
let walreceiver_status = {
|
||||
match &*self.walreceiver.lock().unwrap() {
|
||||
@@ -677,7 +669,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
@@ -766,7 +758,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Compaction which might need to be retried after downloading remote layers.
|
||||
async fn compact_inner(self: &Arc<Self>, ctx: &RequestContext) -> Result<(), CompactionError> {
|
||||
async fn compact_inner(&self, ctx: &RequestContext) -> Result<(), CompactionError> {
|
||||
//
|
||||
// High level strategy for compaction / image creation:
|
||||
//
|
||||
@@ -801,7 +793,7 @@ impl Timeline {
|
||||
// Below are functions compact_level0() and create_image_layers()
|
||||
// but they are a bit ad hoc and don't quite work like it's explained
|
||||
// above. Rewrite it.
|
||||
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
|
||||
let layer_removal_cs = self.layer_removal_cs.lock().await;
|
||||
// Is the timeline being deleted?
|
||||
let state = *self.state.borrow();
|
||||
if state == TimelineState::Stopping {
|
||||
@@ -835,7 +827,7 @@ impl Timeline {
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
|
||||
self.compact_level0(&layer_removal_cs, target_file_size, ctx)
|
||||
.await?;
|
||||
timer.stop_and_record();
|
||||
}
|
||||
@@ -1010,22 +1002,20 @@ impl Timeline {
|
||||
|
||||
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
|
||||
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(remote_layer_desc) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
|
||||
if self.layer_cache.contains(&remote_layer_desc.filename()) {
|
||||
return Ok(Some(false));
|
||||
}
|
||||
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
|
||||
if self.remote_client.is_none() {
|
||||
return Ok(Some(false));
|
||||
}
|
||||
|
||||
self.download_remote_layer(remote_layer_desc).await?;
|
||||
self.download_remote_layer(remote_layer).await?;
|
||||
Ok(Some(true))
|
||||
}
|
||||
|
||||
/// Like [`evict_layer_batch`], but for just one layer.
|
||||
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
|
||||
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(local_layer) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
|
||||
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let remote_client = self
|
||||
.remote_client
|
||||
.as_ref()
|
||||
@@ -1052,7 +1042,7 @@ impl Timeline {
|
||||
pub async fn evict_layers(
|
||||
&self,
|
||||
_: &GenericRemoteStorage,
|
||||
layers_to_evict: &[Arc<RemoteLayerDesc>],
|
||||
layers_to_evict: &[Arc<dyn PersistentLayer>],
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
||||
let remote_client = self.remote_client.clone().expect(
|
||||
@@ -1087,7 +1077,7 @@ impl Timeline {
|
||||
async fn evict_layer_batch(
|
||||
&self,
|
||||
remote_client: &Arc<RemoteTimelineClient>,
|
||||
layers_to_evict: &[Arc<RemoteLayerDesc>],
|
||||
layers_to_evict: &[Arc<dyn PersistentLayer>],
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
||||
// ensure that the layers have finished uploading
|
||||
@@ -1136,12 +1126,12 @@ impl Timeline {
|
||||
fn evict_layer_batch_impl(
|
||||
&self,
|
||||
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||
local_layer: &Arc<RemoteLayerDesc>,
|
||||
batch_updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
|
||||
local_layer: &Arc<dyn PersistentLayer>,
|
||||
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
|
||||
) -> anyhow::Result<bool> {
|
||||
use super::layer_map::Replacement;
|
||||
|
||||
if !self.layer_cache.contains(&local_layer.filename()) {
|
||||
if local_layer.is_remote_layer() {
|
||||
// TODO(issue #3851): consider returning an err here instead of false,
|
||||
// which is the same out the match later
|
||||
return Ok(false);
|
||||
@@ -1168,7 +1158,7 @@ impl Timeline {
|
||||
let layer_metadata = LayerFileMetadata::new(layer_file_size);
|
||||
|
||||
let new_remote_layer = Arc::new(match local_layer.filename() {
|
||||
LayerFileName::Image(image_name) => RemoteLayerDesc::new_img(
|
||||
LayerFileName::Image(image_name) => RemoteLayer::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&image_name,
|
||||
@@ -1177,7 +1167,7 @@ impl Timeline {
|
||||
.access_stats()
|
||||
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
|
||||
),
|
||||
LayerFileName::Delta(delta_name) => RemoteLayerDesc::new_delta(
|
||||
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&delta_name,
|
||||
@@ -1188,7 +1178,6 @@ impl Timeline {
|
||||
),
|
||||
});
|
||||
|
||||
/*
|
||||
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
|
||||
Replacement::Replaced { .. } => {
|
||||
if let Err(e) = local_layer.delete_resident_layer_file() {
|
||||
@@ -1239,10 +1228,8 @@ impl Timeline {
|
||||
false
|
||||
}
|
||||
};
|
||||
*/
|
||||
|
||||
// Ok(replaced)
|
||||
Ok(true)
|
||||
Ok(replaced)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1426,9 +1413,6 @@ impl Timeline {
|
||||
eviction_task_timeline_state: tokio::sync::Mutex::new(
|
||||
EvictionTaskTimelineState::default(),
|
||||
),
|
||||
delete_lock: tokio::sync::Mutex::new(false),
|
||||
|
||||
layer_cache: Arc::new(LayerCache::new()),
|
||||
};
|
||||
result.repartition_threshold = result.get_checkpoint_distance() / 10;
|
||||
result
|
||||
@@ -1575,12 +1559,9 @@ impl Timeline {
|
||||
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
|
||||
);
|
||||
|
||||
let remote_desc = layer.layer_desc();
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
self.layer_cache.insert(layer.filename(), Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(remote_desc));
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
||||
// Create a DeltaLayer struct for each delta file.
|
||||
@@ -1612,9 +1593,7 @@ impl Timeline {
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
let remote_desc = layer.layer_desc();
|
||||
self.layer_cache.insert(layer.filename(), Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(remote_desc));
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
@@ -1659,9 +1638,9 @@ impl Timeline {
|
||||
async fn create_remote_layers(
|
||||
&self,
|
||||
index_part: &IndexPart,
|
||||
local_layers: HashMap<LayerFileName, Arc<RemoteLayerDesc>>,
|
||||
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
|
||||
up_to_date_disk_consistent_lsn: Lsn,
|
||||
) -> anyhow::Result<HashMap<LayerFileName, Arc<RemoteLayerDesc>>> {
|
||||
) -> anyhow::Result<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> {
|
||||
// Are we missing some files that are present in remote storage?
|
||||
// Create RemoteLayer instances for them.
|
||||
let mut local_only_layers = local_layers;
|
||||
@@ -1740,7 +1719,7 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let remote_layer = RemoteLayerDesc::new_img(
|
||||
let remote_layer = RemoteLayer::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
imgfilename,
|
||||
@@ -1768,7 +1747,7 @@ impl Timeline {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let remote_layer = RemoteLayerDesc::new_delta(
|
||||
let remote_layer = RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
deltafilename,
|
||||
@@ -1925,7 +1904,6 @@ impl Timeline {
|
||||
// no cancellation here, because nothing really waits for this to complete compared
|
||||
// to spawn_ondemand_logical_size_calculation.
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let calculated_size = match self_clone
|
||||
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
|
||||
.await
|
||||
@@ -2174,7 +2152,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn find_layer_desc(&self, layer_file_name: &str) -> Option<Arc<RemoteLayerDesc>> {
|
||||
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
|
||||
let historic_layer_name = historic_layer.filename().file_name();
|
||||
if layer_file_name == historic_layer_name {
|
||||
@@ -2190,11 +2168,11 @@ impl Timeline {
|
||||
fn delete_historic_layer(
|
||||
&self,
|
||||
// we cannot remove layers otherwise, since gc and compaction will race
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layer: Arc<RemoteLayerDesc>,
|
||||
updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
|
||||
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
|
||||
) -> anyhow::Result<()> {
|
||||
if self.layer_cache.contains(&layer.filename()) {
|
||||
if !layer.is_remote_layer() {
|
||||
layer.delete_resident_layer_file()?;
|
||||
let layer_file_size = layer.file_size();
|
||||
self.metrics
|
||||
@@ -2443,7 +2421,13 @@ impl Timeline {
|
||||
|
||||
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
|
||||
// If it's a remote layer, download it and retry.
|
||||
if let Some(layer) = self.layer_cache.get(&layer.filename()) {
|
||||
if let Some(remote_layer) =
|
||||
super::storage_layer::downcast_remote_layer(&layer)
|
||||
{
|
||||
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
|
||||
// we downloaded / would need to download this layer.
|
||||
remote_layer // download happens outside the scope of `layers` guard object
|
||||
} else {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||
@@ -2466,10 +2450,6 @@ impl Timeline {
|
||||
}),
|
||||
));
|
||||
continue 'outer;
|
||||
} else {
|
||||
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
|
||||
// we downloaded / would need to download this layer.
|
||||
layer // download happens outside the scope of `layers` guard object
|
||||
}
|
||||
} else if timeline.ancestor_timeline.is_some() {
|
||||
// Nothing on this timeline. Traverse to parent
|
||||
@@ -2652,7 +2632,7 @@ impl Timeline {
|
||||
|
||||
/// Layer flusher task's main loop.
|
||||
async fn flush_loop(
|
||||
self: &Arc<Self>,
|
||||
&self,
|
||||
mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
@@ -2741,9 +2721,9 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Flush one frozen in-memory layer to disk, as a new delta layer.
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
|
||||
#[instrument(skip(self, frozen_layer, ctx), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
|
||||
async fn flush_frozen_layer(
|
||||
self: &Arc<Self>,
|
||||
&self,
|
||||
frozen_layer: Arc<InMemoryLayer>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -2763,16 +2743,7 @@ impl Timeline {
|
||||
.await?
|
||||
} else {
|
||||
// normal case, write out a L0 delta layer file.
|
||||
let this = self.clone();
|
||||
let frozen_layer = frozen_layer.clone();
|
||||
let span = tracing::info_span!("blocking");
|
||||
let (delta_path, metadata) = tokio::task::spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
this.create_delta_layer(&frozen_layer)
|
||||
})
|
||||
.await
|
||||
.context("create_delta_layer spawn_blocking")
|
||||
.and_then(|res| res)?;
|
||||
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
|
||||
HashMap::from([(delta_path, metadata)])
|
||||
};
|
||||
|
||||
@@ -2876,7 +2847,7 @@ impl Timeline {
|
||||
|
||||
// Write out the given frozen in-memory layer as a new L0 delta file
|
||||
fn create_delta_layer(
|
||||
self: &Arc<Self>,
|
||||
&self,
|
||||
frozen_layer: &InMemoryLayer,
|
||||
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
|
||||
// Write it out
|
||||
@@ -2892,13 +2863,10 @@ impl Timeline {
|
||||
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
|
||||
// files to flush, it might be better to first write them all, and then fsync
|
||||
// them all in parallel.
|
||||
|
||||
// First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace
|
||||
// this with a single fsync in future refactors.
|
||||
par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?;
|
||||
// Then sync the parent directory.
|
||||
par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
|
||||
.context("fsync of timeline dir")?;
|
||||
par_fsync::par_fsync(&[
|
||||
new_delta_path.clone(),
|
||||
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
|
||||
])?;
|
||||
|
||||
// Add it to the layer map
|
||||
let l = Arc::new(new_delta);
|
||||
@@ -2909,8 +2877,7 @@ impl Timeline {
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
batch_updates.insert_historic(Arc::new(l.layer_desc()));
|
||||
self.layer_cache.insert(l.filename(), l);
|
||||
batch_updates.insert_historic(l);
|
||||
batch_updates.flush();
|
||||
|
||||
// update the timeline's physical size
|
||||
@@ -3123,15 +3090,11 @@ impl Timeline {
|
||||
let all_paths = image_layers
|
||||
.iter()
|
||||
.map(|layer| layer.path())
|
||||
.chain(std::iter::once(
|
||||
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
|
||||
))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
par_fsync::par_fsync_async(&all_paths)
|
||||
.await
|
||||
.context("fsync of newly created layer files")?;
|
||||
|
||||
par_fsync::par_fsync_async(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
|
||||
.await
|
||||
.context("fsync of timeline dir")?;
|
||||
par_fsync::par_fsync(&all_paths).context("fsync of newly created layer files")?;
|
||||
|
||||
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
|
||||
|
||||
@@ -3156,9 +3119,7 @@ impl Timeline {
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(Arc::new(l.layer_desc()));
|
||||
let x: Arc<dyn PersistentLayer> = l;
|
||||
self.layer_cache.insert(x.filename(), x)
|
||||
updates.insert_historic(l);
|
||||
}
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
@@ -3171,7 +3132,7 @@ impl Timeline {
|
||||
#[derive(Default)]
|
||||
struct CompactLevel0Phase1Result {
|
||||
new_layers: Vec<DeltaLayer>,
|
||||
deltas_to_compact: Vec<Arc<RemoteLayerDesc>>,
|
||||
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
|
||||
}
|
||||
|
||||
/// Top-level failure to compact.
|
||||
@@ -3181,7 +3142,7 @@ enum CompactionError {
|
||||
///
|
||||
/// This should not happen repeatedly, but will be retried once by top-level
|
||||
/// `Timeline::compact`.
|
||||
DownloadRequired(Vec<Arc<RemoteLayerDesc>>),
|
||||
DownloadRequired(Vec<Arc<RemoteLayer>>),
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
@@ -3198,9 +3159,9 @@ impl Timeline {
|
||||
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
|
||||
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
|
||||
/// start of level0 files compaction, the on-demand download should be revisited as well.
|
||||
fn compact_level0_phase1(
|
||||
async fn compact_level0_phase1(
|
||||
&self,
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
@@ -3253,9 +3214,13 @@ impl Timeline {
|
||||
|
||||
let remotes = deltas_to_compact
|
||||
.iter()
|
||||
.filter(|l| !self.layer_cache.contains(&l.filename()))
|
||||
.filter(|l| l.is_remote_layer())
|
||||
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
|
||||
.cloned()
|
||||
.map(|l| {
|
||||
l.clone()
|
||||
.downcast_remote_layer()
|
||||
.expect("just checked it is remote layer")
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if !remotes.is_empty() {
|
||||
@@ -3509,13 +3474,13 @@ impl Timeline {
|
||||
if !new_layers.is_empty() {
|
||||
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
|
||||
|
||||
// also sync the directory
|
||||
layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id));
|
||||
|
||||
// Fsync all the layer files and directory using multiple threads to
|
||||
// minimize latency.
|
||||
par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
|
||||
|
||||
par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
|
||||
.context("fsync of timeline dir")?;
|
||||
|
||||
layer_paths.pop().unwrap();
|
||||
}
|
||||
|
||||
@@ -3532,26 +3497,17 @@ impl Timeline {
|
||||
/// as Level 1 files.
|
||||
///
|
||||
async fn compact_level0(
|
||||
self: &Arc<Self>,
|
||||
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
&self,
|
||||
layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
let this = self.clone();
|
||||
let ctx_inner = ctx.clone();
|
||||
let layer_removal_cs_inner = layer_removal_cs.clone();
|
||||
let span = tracing::info_span!("blocking");
|
||||
let CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
} = tokio::task::spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner)
|
||||
})
|
||||
.await
|
||||
.context("compact_level0_phase1 spawn_blocking")
|
||||
.map_err(CompactionError::Other)
|
||||
.and_then(|res| res)?;
|
||||
} = self
|
||||
.compact_level0_phase1(layer_removal_cs, target_file_size, ctx)
|
||||
.await?;
|
||||
|
||||
if new_layers.is_empty() && deltas_to_compact.is_empty() {
|
||||
// nothing to do
|
||||
@@ -3595,15 +3551,13 @@ impl Timeline {
|
||||
.add(metadata.len());
|
||||
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
let remote_desc = l.layer_desc();
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
x.access_stats().record_residence_event(
|
||||
&updates,
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(Arc::new(remote_desc));
|
||||
self.layer_cache.insert(x.filename(), x)
|
||||
updates.insert_historic(x);
|
||||
}
|
||||
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
@@ -3611,7 +3565,7 @@ impl Timeline {
|
||||
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
|
||||
for l in deltas_to_compact {
|
||||
layer_names_to_delete.push(l.filename());
|
||||
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?;
|
||||
self.delete_historic_layer(layer_removal_cs, l, &mut updates)?;
|
||||
}
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
@@ -3731,7 +3685,7 @@ impl Timeline {
|
||||
|
||||
fail_point!("before-timeline-gc");
|
||||
|
||||
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
|
||||
let layer_removal_cs = self.layer_removal_cs.lock().await;
|
||||
// Is the timeline being deleted?
|
||||
let state = *self.state.borrow();
|
||||
if state == TimelineState::Stopping {
|
||||
@@ -3751,7 +3705,7 @@ impl Timeline {
|
||||
|
||||
let res = self
|
||||
.gc_timeline(
|
||||
layer_removal_cs.clone(),
|
||||
&layer_removal_cs,
|
||||
horizon_cutoff,
|
||||
pitr_cutoff,
|
||||
retain_lsns,
|
||||
@@ -3770,7 +3724,7 @@ impl Timeline {
|
||||
|
||||
async fn gc_timeline(
|
||||
&self,
|
||||
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||
horizon_cutoff: Lsn,
|
||||
pitr_cutoff: Lsn,
|
||||
retain_lsns: Vec<Lsn>,
|
||||
@@ -3943,11 +3897,7 @@ impl Timeline {
|
||||
{
|
||||
for doomed_layer in layers_to_remove {
|
||||
layer_names_to_delete.push(doomed_layer.filename());
|
||||
self.delete_historic_layer(
|
||||
layer_removal_cs.clone(),
|
||||
doomed_layer,
|
||||
&mut updates,
|
||||
)?; // FIXME: schedule succeeded deletions before returning?
|
||||
self.delete_historic_layer(layer_removal_cs, doomed_layer, &mut updates)?; // FIXME: schedule succeeded deletions before returning?
|
||||
result.layers_removed += 1;
|
||||
}
|
||||
}
|
||||
@@ -4076,7 +4026,7 @@ impl Timeline {
|
||||
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
|
||||
pub async fn download_remote_layer(
|
||||
&self,
|
||||
remote_layer: Arc<RemoteLayerDesc>,
|
||||
remote_layer: Arc<RemoteLayer>,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -4133,12 +4083,10 @@ impl Timeline {
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let mut layers = self_clone.layers.write().unwrap();
|
||||
let mut updates = layers.batch_update();
|
||||
let new_layer =
|
||||
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
{
|
||||
use crate::tenant::layer_map::Replacement;
|
||||
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
|
||||
/*
|
||||
let failure = match updates.replace_historic(&l, new_layer) {
|
||||
Ok(Replacement::Replaced { .. }) => false,
|
||||
Ok(Replacement::NotFound) => {
|
||||
@@ -4193,9 +4141,8 @@ impl Timeline {
|
||||
remote_layer
|
||||
.download_replacement_failure
|
||||
.store(true, Relaxed);
|
||||
} */
|
||||
}
|
||||
}
|
||||
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
|
||||
@@ -4208,10 +4155,7 @@ impl Timeline {
|
||||
remote_layer.ongoing_download.close();
|
||||
} else {
|
||||
// Keep semaphore open. We'll drop the permit at the end of the function.
|
||||
error!(
|
||||
"layer file download failed: {:?}",
|
||||
result.as_ref().unwrap_err()
|
||||
);
|
||||
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
|
||||
}
|
||||
|
||||
// Don't treat it as an error if the task that triggered the download
|
||||
@@ -4225,8 +4169,7 @@ impl Timeline {
|
||||
drop(permit);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
}.in_current_span(),
|
||||
);
|
||||
|
||||
receiver.await.context("download task cancelled")?
|
||||
@@ -4299,7 +4242,7 @@ impl Timeline {
|
||||
let layers = self.layers.read().unwrap();
|
||||
layers
|
||||
.iter_historic_layers()
|
||||
.filter(|l| !self.layer_cache.contains(&l.filename()))
|
||||
.filter_map(|l| l.downcast_remote_layer())
|
||||
.map(|l| self.download_remote_layer(l))
|
||||
.for_each(|dl| downloads.push(dl))
|
||||
}
|
||||
@@ -4374,7 +4317,7 @@ pub struct DiskUsageEvictionInfo {
|
||||
}
|
||||
|
||||
pub struct LocalLayerInfoForDiskUsageEviction {
|
||||
pub layer: Arc<RemoteLayerDesc>,
|
||||
pub layer: Arc<dyn PersistentLayer>,
|
||||
pub last_activity_ts: SystemTime,
|
||||
}
|
||||
|
||||
@@ -4408,7 +4351,7 @@ impl Timeline {
|
||||
let file_size = l.file_size();
|
||||
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
|
||||
|
||||
if !self.layer_cache.contains(&l.filename()) {
|
||||
if l.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::{
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
storage_layer::{PersistentLayer, RemoteLayerDesc},
|
||||
storage_layer::PersistentLayer,
|
||||
LogicalSizeCalculationCause, Tenant,
|
||||
},
|
||||
};
|
||||
@@ -184,11 +184,11 @@ impl Timeline {
|
||||
// NB: all the checks can be invalidated as soon as we release the layer map lock.
|
||||
// We don't want to hold the layer map lock during eviction.
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<RemoteLayerDesc>> = {
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
if !self.layer_cache.contains(&hist_layer.filename()) {
|
||||
if hist_layer.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -1309,6 +1309,7 @@ mod tests {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
|
||||
ConnectionManagerState {
|
||||
|
||||
@@ -1209,7 +1209,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_relsize() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
@@ -1428,7 +1428,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_drop_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
@@ -1497,7 +1497,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_truncate_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
// Create a 20 MB relation (the size is arbitrary)
|
||||
@@ -1637,7 +1637,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_large_rel() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut lsn = 0x10;
|
||||
|
||||
@@ -11,12 +11,10 @@ OBJS = \
|
||||
pagestore_smgr.o \
|
||||
relsize_cache.o \
|
||||
walproposer.o \
|
||||
walproposer_utils.o \
|
||||
control_plane_connector.o
|
||||
walproposer_utils.o
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql
|
||||
|
||||
@@ -1,830 +0,0 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* control_plane_connector.c
|
||||
* Captures updates to roles/databases using ProcessUtility_hook and
|
||||
* sends them to the control ProcessUtility_hook. The changes are sent
|
||||
* via HTTP to the URL specified by the GUC neon.console_url when the
|
||||
* transaction commits. Forwarding may be disabled temporarily by
|
||||
* setting neon.forward_ddl to false.
|
||||
*
|
||||
* Currently, the transaction may abort AFTER
|
||||
* changes have already been forwarded, and that case is not handled.
|
||||
* Subtransactions are handled using a stack of hash tables, which
|
||||
* accumulate changes. On subtransaction commit, the top of the stack
|
||||
* is merged with the table below it.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/control_plane_connector.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
#include "tcop/pquery.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "access/xact.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/acl.h"
|
||||
#include "fmgr.h"
|
||||
#include "utils/guc.h"
|
||||
#include "port.h"
|
||||
#include <curl/curl.h>
|
||||
#include "utils/jsonb.h"
|
||||
|
||||
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
|
||||
|
||||
/* GUCs */
|
||||
static char *ConsoleURL = NULL;
|
||||
static bool ForwardDDL = true;
|
||||
|
||||
/* Curl structures for sending the HTTP requests */
|
||||
static CURL * CurlHandle;
|
||||
static struct curl_slist *ContentHeader = NULL;
|
||||
|
||||
/*
|
||||
* CURL docs say that this buffer must exist until we call curl_easy_cleanup
|
||||
* (which we never do), so we make this a static
|
||||
*/
|
||||
static char CurlErrorBuf[CURL_ERROR_SIZE];
|
||||
|
||||
typedef enum
|
||||
{
|
||||
Op_Set, /* An upsert: Either a creation or an alter */
|
||||
Op_Delete,
|
||||
} OpType;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
char name[NAMEDATALEN];
|
||||
Oid owner;
|
||||
char old_name[NAMEDATALEN];
|
||||
OpType type;
|
||||
} DbEntry;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
char name[NAMEDATALEN];
|
||||
char old_name[NAMEDATALEN];
|
||||
const char *password;
|
||||
OpType type;
|
||||
} RoleEntry;
|
||||
|
||||
/*
|
||||
* We keep one of these for each subtransaction in a stack. When a subtransaction
|
||||
* commits, we merge the top of the stack into the table below it. It is allocated in the
|
||||
* subtransaction's context.
|
||||
*/
|
||||
typedef struct DdlHashTable
|
||||
{
|
||||
struct DdlHashTable *prev_table;
|
||||
HTAB *db_table;
|
||||
HTAB *role_table;
|
||||
} DdlHashTable;
|
||||
|
||||
static DdlHashTable RootTable;
|
||||
static DdlHashTable * CurrentDdlTable = &RootTable;
|
||||
|
||||
static void
|
||||
PushKeyValue(JsonbParseState **state, char *key, char *value)
|
||||
{
|
||||
JsonbValue k,
|
||||
v;
|
||||
|
||||
k.type = jbvString;
|
||||
k.val.string.len = strlen(key);
|
||||
k.val.string.val = key;
|
||||
v.type = jbvString;
|
||||
v.val.string.len = strlen(value);
|
||||
v.val.string.val = value;
|
||||
pushJsonbValue(state, WJB_KEY, &k);
|
||||
pushJsonbValue(state, WJB_VALUE, &v);
|
||||
}
|
||||
|
||||
static char *
|
||||
ConstructDeltaMessage()
|
||||
{
|
||||
JsonbParseState *state = NULL;
|
||||
|
||||
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
|
||||
if (RootTable.db_table)
|
||||
{
|
||||
JsonbValue dbs;
|
||||
|
||||
dbs.type = jbvString;
|
||||
dbs.val.string.val = "dbs";
|
||||
dbs.val.string.len = strlen(dbs.val.string.val);
|
||||
pushJsonbValue(&state, WJB_KEY, &dbs);
|
||||
pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL);
|
||||
|
||||
HASH_SEQ_STATUS status;
|
||||
DbEntry *entry;
|
||||
|
||||
hash_seq_init(&status, RootTable.db_table);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
|
||||
PushKeyValue(&state, "op", entry->type == Op_Set ? "set" : "del");
|
||||
PushKeyValue(&state, "name", entry->name);
|
||||
if (entry->owner != InvalidOid)
|
||||
{
|
||||
PushKeyValue(&state, "owner", GetUserNameFromId(entry->owner, false));
|
||||
}
|
||||
if (entry->old_name[0] != '\0')
|
||||
{
|
||||
PushKeyValue(&state, "old_name", entry->old_name);
|
||||
}
|
||||
pushJsonbValue(&state, WJB_END_OBJECT, NULL);
|
||||
}
|
||||
pushJsonbValue(&state, WJB_END_ARRAY, NULL);
|
||||
}
|
||||
|
||||
if (RootTable.role_table)
|
||||
{
|
||||
JsonbValue roles;
|
||||
|
||||
roles.type = jbvString;
|
||||
roles.val.string.val = "roles";
|
||||
roles.val.string.len = strlen(roles.val.string.val);
|
||||
pushJsonbValue(&state, WJB_KEY, &roles);
|
||||
pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL);
|
||||
|
||||
HASH_SEQ_STATUS status;
|
||||
RoleEntry *entry;
|
||||
|
||||
hash_seq_init(&status, RootTable.role_table);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
|
||||
PushKeyValue(&state, "op", entry->type == Op_Set ? "set" : "del");
|
||||
PushKeyValue(&state, "name", entry->name);
|
||||
if (entry->password)
|
||||
{
|
||||
PushKeyValue(&state, "password", (char *) entry->password);
|
||||
}
|
||||
if (entry->old_name[0] != '\0')
|
||||
{
|
||||
PushKeyValue(&state, "old_name", entry->old_name);
|
||||
}
|
||||
pushJsonbValue(&state, WJB_END_OBJECT, NULL);
|
||||
}
|
||||
pushJsonbValue(&state, WJB_END_ARRAY, NULL);
|
||||
}
|
||||
JsonbValue *result = pushJsonbValue(&state, WJB_END_OBJECT, NULL);
|
||||
Jsonb *jsonb = JsonbValueToJsonb(result);
|
||||
|
||||
return JsonbToCString(NULL, &jsonb->root, 0 /* estimated_len */ );
|
||||
}
|
||||
|
||||
#define ERROR_SIZE 1024
|
||||
|
||||
typedef struct
|
||||
{
|
||||
char str[ERROR_SIZE];
|
||||
size_t size;
|
||||
} ErrorString;
|
||||
|
||||
static size_t
|
||||
ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
|
||||
{
|
||||
/* Docs say size is always 1 */
|
||||
ErrorString *str = userdata;
|
||||
|
||||
size_t to_write = nmemb;
|
||||
|
||||
/* +1 for null terminator */
|
||||
if (str->size + nmemb + 1 >= ERROR_SIZE)
|
||||
to_write = ERROR_SIZE - str->size - 1;
|
||||
|
||||
/* Ignore everyrthing past the first ERROR_SIZE bytes */
|
||||
if (to_write == 0)
|
||||
return nmemb;
|
||||
memcpy(str->str + str->size, ptr, to_write);
|
||||
str->size += to_write;
|
||||
str->str[str->size] = '\0';
|
||||
return nmemb;
|
||||
}
|
||||
|
||||
static void
|
||||
SendDeltasToControlPlane()
|
||||
{
|
||||
if (!RootTable.db_table && !RootTable.role_table)
|
||||
return;
|
||||
if (!ConsoleURL)
|
||||
{
|
||||
elog(LOG, "ConsoleURL not set, skipping forwarding");
|
||||
return;
|
||||
}
|
||||
if (!ForwardDDL)
|
||||
return;
|
||||
|
||||
char *message = ConstructDeltaMessage();
|
||||
ErrorString str = {};
|
||||
|
||||
curl_easy_setopt(CurlHandle, CURLOPT_CUSTOMREQUEST, "PATCH");
|
||||
curl_easy_setopt(CurlHandle, CURLOPT_HTTPHEADER, ContentHeader);
|
||||
curl_easy_setopt(CurlHandle, CURLOPT_POSTFIELDS, message);
|
||||
curl_easy_setopt(CurlHandle, CURLOPT_URL, ConsoleURL);
|
||||
curl_easy_setopt(CurlHandle, CURLOPT_ERRORBUFFER, CurlErrorBuf);
|
||||
curl_easy_setopt(CurlHandle, CURLOPT_TIMEOUT, 3L /* seconds */ );
|
||||
curl_easy_setopt(CurlHandle, CURLOPT_WRITEDATA, &str);
|
||||
curl_easy_setopt(CurlHandle, CURLOPT_WRITEFUNCTION, ErrorWriteCallback);
|
||||
|
||||
const int num_retries = 5;
|
||||
int curl_status;
|
||||
|
||||
for (int i = 0; i < num_retries; i++)
|
||||
{
|
||||
if ((curl_status = curl_easy_perform(CurlHandle)) == 0)
|
||||
break;
|
||||
elog(LOG, "Curl request failed on attempt %d: %s", i, CurlErrorBuf);
|
||||
pg_usleep(1000 * 1000);
|
||||
}
|
||||
if (curl_status != 0)
|
||||
{
|
||||
elog(ERROR, "Failed to perform curl request: %s", CurlErrorBuf);
|
||||
}
|
||||
else
|
||||
{
|
||||
long response_code;
|
||||
|
||||
if (curl_easy_getinfo(CurlHandle, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_UNKNOWN_OPTION)
|
||||
{
|
||||
bool error_exists = str.size != 0;
|
||||
|
||||
if (response_code != 200)
|
||||
{
|
||||
if (error_exists)
|
||||
{
|
||||
elog(ERROR,
|
||||
"Received HTTP code %ld from control plane: %s",
|
||||
response_code,
|
||||
str.str);
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(ERROR,
|
||||
"Received HTTP code %ld from control plane",
|
||||
response_code);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
InitDbTableIfNeeded()
|
||||
{
|
||||
if (!CurrentDdlTable->db_table)
|
||||
{
|
||||
HASHCTL db_ctl = {};
|
||||
|
||||
db_ctl.keysize = NAMEDATALEN;
|
||||
db_ctl.entrysize = sizeof(DbEntry);
|
||||
db_ctl.hcxt = CurTransactionContext;
|
||||
CurrentDdlTable->db_table = hash_create(
|
||||
"Dbs Created",
|
||||
4,
|
||||
&db_ctl,
|
||||
HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
InitRoleTableIfNeeded()
|
||||
{
|
||||
if (!CurrentDdlTable->role_table)
|
||||
{
|
||||
HASHCTL role_ctl = {};
|
||||
|
||||
role_ctl.keysize = NAMEDATALEN;
|
||||
role_ctl.entrysize = sizeof(RoleEntry);
|
||||
role_ctl.hcxt = CurTransactionContext;
|
||||
CurrentDdlTable->role_table = hash_create(
|
||||
"Roles Created",
|
||||
4,
|
||||
&role_ctl,
|
||||
HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
PushTable()
|
||||
{
|
||||
DdlHashTable *new_table = MemoryContextAlloc(CurTransactionContext, sizeof(DdlHashTable));
|
||||
|
||||
new_table->prev_table = CurrentDdlTable;
|
||||
new_table->role_table = NULL;
|
||||
new_table->db_table = NULL;
|
||||
CurrentDdlTable = new_table;
|
||||
}
|
||||
|
||||
static void
|
||||
MergeTable()
|
||||
{
|
||||
DdlHashTable *old_table = CurrentDdlTable;
|
||||
|
||||
CurrentDdlTable = old_table->prev_table;
|
||||
|
||||
if (old_table->db_table)
|
||||
{
|
||||
InitDbTableIfNeeded();
|
||||
DbEntry *entry;
|
||||
HASH_SEQ_STATUS status;
|
||||
|
||||
hash_seq_init(&status, old_table->db_table);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
DbEntry *to_write = hash_search(
|
||||
CurrentDdlTable->db_table,
|
||||
entry->name,
|
||||
HASH_ENTER,
|
||||
NULL);
|
||||
|
||||
to_write->type = entry->type;
|
||||
if (entry->owner != InvalidOid)
|
||||
to_write->owner = entry->owner;
|
||||
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
|
||||
if (entry->old_name[0] != '\0')
|
||||
{
|
||||
bool found_old = false;
|
||||
DbEntry *old = hash_search(
|
||||
CurrentDdlTable->db_table,
|
||||
entry->old_name,
|
||||
HASH_FIND,
|
||||
&found_old);
|
||||
|
||||
if (found_old)
|
||||
{
|
||||
if (old->old_name[0] != '\0')
|
||||
strlcpy(to_write->old_name, old->old_name, NAMEDATALEN);
|
||||
else
|
||||
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
|
||||
hash_search(
|
||||
CurrentDdlTable->db_table,
|
||||
entry->old_name,
|
||||
HASH_REMOVE,
|
||||
NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
hash_destroy(old_table->db_table);
|
||||
}
|
||||
|
||||
if (old_table->role_table)
|
||||
{
|
||||
InitRoleTableIfNeeded();
|
||||
RoleEntry *entry;
|
||||
HASH_SEQ_STATUS status;
|
||||
|
||||
hash_seq_init(&status, old_table->role_table);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
RoleEntry *to_write = hash_search(
|
||||
CurrentDdlTable->role_table,
|
||||
entry->name,
|
||||
HASH_ENTER,
|
||||
NULL);
|
||||
|
||||
to_write->type = entry->type;
|
||||
if (entry->password)
|
||||
to_write->password = entry->password;
|
||||
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
|
||||
if (entry->old_name[0] != '\0')
|
||||
{
|
||||
bool found_old = false;
|
||||
RoleEntry *old = hash_search(
|
||||
CurrentDdlTable->role_table,
|
||||
entry->old_name,
|
||||
HASH_FIND,
|
||||
&found_old);
|
||||
|
||||
if (found_old)
|
||||
{
|
||||
if (old->old_name[0] != '\0')
|
||||
strlcpy(to_write->old_name, old->old_name, NAMEDATALEN);
|
||||
else
|
||||
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
|
||||
hash_search(CurrentDdlTable->role_table,
|
||||
entry->old_name,
|
||||
HASH_REMOVE,
|
||||
NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
hash_destroy(old_table->role_table);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
PopTable()
|
||||
{
|
||||
/*
|
||||
* Current table gets freed because it is allocated in aborted
|
||||
* subtransaction's memory context.
|
||||
*/
|
||||
CurrentDdlTable = CurrentDdlTable->prev_table;
|
||||
}
|
||||
|
||||
static void
|
||||
NeonSubXactCallback(
|
||||
SubXactEvent event,
|
||||
SubTransactionId mySubid,
|
||||
SubTransactionId parentSubid,
|
||||
void *arg)
|
||||
{
|
||||
switch (event)
|
||||
{
|
||||
case SUBXACT_EVENT_START_SUB:
|
||||
return PushTable();
|
||||
case SUBXACT_EVENT_COMMIT_SUB:
|
||||
return MergeTable();
|
||||
case SUBXACT_EVENT_ABORT_SUB:
|
||||
return PopTable();
|
||||
default:
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
NeonXactCallback(XactEvent event, void *arg)
|
||||
{
|
||||
if (event == XACT_EVENT_PRE_COMMIT || event == XACT_EVENT_PARALLEL_PRE_COMMIT)
|
||||
{
|
||||
SendDeltasToControlPlane();
|
||||
}
|
||||
RootTable.role_table = NULL;
|
||||
RootTable.db_table = NULL;
|
||||
Assert(CurrentDdlTable == &RootTable);
|
||||
}
|
||||
|
||||
static void
|
||||
HandleCreateDb(CreatedbStmt *stmt)
|
||||
{
|
||||
InitDbTableIfNeeded();
|
||||
DefElem *downer = NULL;
|
||||
ListCell *option;
|
||||
|
||||
foreach(option, stmt->options)
|
||||
{
|
||||
DefElem *defel = lfirst(option);
|
||||
|
||||
if (strcmp(defel->defname, "owner") == 0)
|
||||
downer = defel;
|
||||
}
|
||||
bool found = false;
|
||||
DbEntry *entry = hash_search(
|
||||
CurrentDdlTable->db_table,
|
||||
stmt->dbname,
|
||||
HASH_ENTER,
|
||||
&found);
|
||||
|
||||
if (!found)
|
||||
memset(entry->old_name, 0, sizeof(entry->old_name));
|
||||
|
||||
entry->type = Op_Set;
|
||||
if (downer && downer->arg)
|
||||
entry->owner = get_role_oid(defGetString(downer), false);
|
||||
else
|
||||
entry->owner = GetUserId();
|
||||
}
|
||||
|
||||
static void
|
||||
HandleAlterOwner(AlterOwnerStmt *stmt)
|
||||
{
|
||||
if (stmt->objectType != OBJECT_DATABASE)
|
||||
return;
|
||||
InitDbTableIfNeeded();
|
||||
const char *name = strVal(stmt->object);
|
||||
bool found = false;
|
||||
DbEntry *entry = hash_search(
|
||||
CurrentDdlTable->db_table,
|
||||
name,
|
||||
HASH_ENTER,
|
||||
&found);
|
||||
|
||||
if (!found)
|
||||
memset(entry->old_name, 0, sizeof(entry->old_name));
|
||||
|
||||
entry->owner = get_role_oid(get_rolespec_name(stmt->newowner), false);
|
||||
entry->type = Op_Set;
|
||||
}
|
||||
|
||||
static void
|
||||
HandleDbRename(RenameStmt *stmt)
|
||||
{
|
||||
Assert(stmt->renameType == OBJECT_DATABASE);
|
||||
InitDbTableIfNeeded();
|
||||
bool found = false;
|
||||
DbEntry *entry = hash_search(
|
||||
CurrentDdlTable->db_table,
|
||||
stmt->subname,
|
||||
HASH_FIND,
|
||||
&found);
|
||||
DbEntry *entry_for_new_name = hash_search(
|
||||
CurrentDdlTable->db_table,
|
||||
stmt->newname,
|
||||
HASH_ENTER,
|
||||
NULL);
|
||||
|
||||
entry_for_new_name->type = Op_Set;
|
||||
if (found)
|
||||
{
|
||||
if (entry->old_name[0] != '\0')
|
||||
strlcpy(entry_for_new_name->old_name, entry->old_name, NAMEDATALEN);
|
||||
else
|
||||
strlcpy(entry_for_new_name->old_name, entry->name, NAMEDATALEN);
|
||||
entry_for_new_name->owner = entry->owner;
|
||||
hash_search(
|
||||
CurrentDdlTable->db_table,
|
||||
stmt->subname,
|
||||
HASH_REMOVE,
|
||||
NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
strlcpy(entry_for_new_name->old_name, stmt->subname, NAMEDATALEN);
|
||||
entry_for_new_name->owner = InvalidOid;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
HandleDropDb(DropdbStmt *stmt)
|
||||
{
|
||||
InitDbTableIfNeeded();
|
||||
bool found = false;
|
||||
DbEntry *entry = hash_search(
|
||||
CurrentDdlTable->db_table,
|
||||
stmt->dbname,
|
||||
HASH_ENTER,
|
||||
&found);
|
||||
|
||||
entry->type = Op_Delete;
|
||||
entry->owner = InvalidOid;
|
||||
if (!found)
|
||||
memset(entry->old_name, 0, sizeof(entry->old_name));
|
||||
}
|
||||
|
||||
static void
|
||||
HandleCreateRole(CreateRoleStmt *stmt)
|
||||
{
|
||||
InitRoleTableIfNeeded();
|
||||
bool found = false;
|
||||
RoleEntry *entry = hash_search(
|
||||
CurrentDdlTable->role_table,
|
||||
stmt->role,
|
||||
HASH_ENTER,
|
||||
&found);
|
||||
DefElem *dpass = NULL;
|
||||
ListCell *option;
|
||||
|
||||
foreach(option, stmt->options)
|
||||
{
|
||||
DefElem *defel = lfirst(option);
|
||||
|
||||
if (strcmp(defel->defname, "password") == 0)
|
||||
dpass = defel;
|
||||
}
|
||||
if (!found)
|
||||
memset(entry->old_name, 0, sizeof(entry->old_name));
|
||||
if (dpass && dpass->arg)
|
||||
entry->password = MemoryContextStrdup(CurTransactionContext, strVal(dpass->arg));
|
||||
else
|
||||
entry->password = NULL;
|
||||
entry->type = Op_Set;
|
||||
}
|
||||
|
||||
static void
|
||||
HandleAlterRole(AlterRoleStmt *stmt)
|
||||
{
|
||||
InitRoleTableIfNeeded();
|
||||
DefElem *dpass = NULL;
|
||||
ListCell *option;
|
||||
|
||||
foreach(option, stmt->options)
|
||||
{
|
||||
DefElem *defel = lfirst(option);
|
||||
|
||||
if (strcmp(defel->defname, "password") == 0)
|
||||
dpass = defel;
|
||||
}
|
||||
/* We only care about updates to the password */
|
||||
if (!dpass)
|
||||
return;
|
||||
bool found = false;
|
||||
RoleEntry *entry = hash_search(
|
||||
CurrentDdlTable->role_table,
|
||||
stmt->role->rolename,
|
||||
HASH_ENTER,
|
||||
&found);
|
||||
|
||||
if (!found)
|
||||
memset(entry->old_name, 0, sizeof(entry->old_name));
|
||||
if (dpass->arg)
|
||||
entry->password = MemoryContextStrdup(CurTransactionContext, strVal(dpass->arg));
|
||||
else
|
||||
entry->password = NULL;
|
||||
entry->type = Op_Set;
|
||||
}
|
||||
|
||||
static void
|
||||
HandleRoleRename(RenameStmt *stmt)
|
||||
{
|
||||
InitRoleTableIfNeeded();
|
||||
Assert(stmt->renameType == OBJECT_ROLE);
|
||||
bool found = false;
|
||||
RoleEntry *entry = hash_search(
|
||||
CurrentDdlTable->role_table,
|
||||
stmt->subname,
|
||||
HASH_FIND,
|
||||
&found);
|
||||
|
||||
RoleEntry *entry_for_new_name = hash_search(
|
||||
CurrentDdlTable->role_table,
|
||||
stmt->newname,
|
||||
HASH_ENTER,
|
||||
NULL);
|
||||
|
||||
entry_for_new_name->type = Op_Set;
|
||||
if (found)
|
||||
{
|
||||
if (entry->old_name[0] != '\0')
|
||||
strlcpy(entry_for_new_name->old_name, entry->old_name, NAMEDATALEN);
|
||||
else
|
||||
strlcpy(entry_for_new_name->old_name, entry->name, NAMEDATALEN);
|
||||
entry_for_new_name->password = entry->password;
|
||||
hash_search(
|
||||
CurrentDdlTable->role_table,
|
||||
entry->name,
|
||||
HASH_REMOVE,
|
||||
NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
strlcpy(entry_for_new_name->old_name, stmt->subname, NAMEDATALEN);
|
||||
entry_for_new_name->password = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
HandleDropRole(DropRoleStmt *stmt)
|
||||
{
|
||||
InitRoleTableIfNeeded();
|
||||
ListCell *item;
|
||||
|
||||
foreach(item, stmt->roles)
|
||||
{
|
||||
RoleSpec *spec = lfirst(item);
|
||||
bool found = false;
|
||||
RoleEntry *entry = hash_search(
|
||||
CurrentDdlTable->role_table,
|
||||
spec->rolename,
|
||||
HASH_ENTER,
|
||||
&found);
|
||||
|
||||
entry->type = Op_Delete;
|
||||
entry->password = NULL;
|
||||
if (!found)
|
||||
memset(entry->old_name, 0, sizeof(entry));
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
HandleRename(RenameStmt *stmt)
|
||||
{
|
||||
if (stmt->renameType == OBJECT_DATABASE)
|
||||
return HandleDbRename(stmt);
|
||||
else if (stmt->renameType == OBJECT_ROLE)
|
||||
return HandleRoleRename(stmt);
|
||||
}
|
||||
|
||||
static void
|
||||
NeonProcessUtility(
|
||||
PlannedStmt *pstmt,
|
||||
const char *queryString,
|
||||
bool readOnlyTree,
|
||||
ProcessUtilityContext context,
|
||||
ParamListInfo params,
|
||||
QueryEnvironment *queryEnv,
|
||||
DestReceiver *dest,
|
||||
QueryCompletion *qc)
|
||||
{
|
||||
Node *parseTree = pstmt->utilityStmt;
|
||||
|
||||
switch (nodeTag(parseTree))
|
||||
{
|
||||
case T_CreatedbStmt:
|
||||
HandleCreateDb(castNode(CreatedbStmt, parseTree));
|
||||
break;
|
||||
case T_AlterOwnerStmt:
|
||||
HandleAlterOwner(castNode(AlterOwnerStmt, parseTree));
|
||||
break;
|
||||
case T_RenameStmt:
|
||||
HandleRename(castNode(RenameStmt, parseTree));
|
||||
break;
|
||||
case T_DropdbStmt:
|
||||
HandleDropDb(castNode(DropdbStmt, parseTree));
|
||||
break;
|
||||
case T_CreateRoleStmt:
|
||||
HandleCreateRole(castNode(CreateRoleStmt, parseTree));
|
||||
break;
|
||||
case T_AlterRoleStmt:
|
||||
HandleAlterRole(castNode(AlterRoleStmt, parseTree));
|
||||
break;
|
||||
case T_DropRoleStmt:
|
||||
HandleDropRole(castNode(DropRoleStmt, parseTree));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (PreviousProcessUtilityHook)
|
||||
{
|
||||
PreviousProcessUtilityHook(
|
||||
pstmt,
|
||||
queryString,
|
||||
readOnlyTree,
|
||||
context,
|
||||
params,
|
||||
queryEnv,
|
||||
dest,
|
||||
qc);
|
||||
}
|
||||
else
|
||||
{
|
||||
standard_ProcessUtility(
|
||||
pstmt,
|
||||
queryString,
|
||||
readOnlyTree,
|
||||
context,
|
||||
params,
|
||||
queryEnv,
|
||||
dest,
|
||||
qc);
|
||||
}
|
||||
}
|
||||
|
||||
extern void
|
||||
InitControlPlaneConnector()
|
||||
{
|
||||
PreviousProcessUtilityHook = ProcessUtility_hook;
|
||||
ProcessUtility_hook = NeonProcessUtility;
|
||||
RegisterXactCallback(NeonXactCallback, NULL);
|
||||
RegisterSubXactCallback(NeonSubXactCallback, NULL);
|
||||
|
||||
DefineCustomStringVariable(
|
||||
"neon.console_url",
|
||||
"URL of the Neon Console, which will be forwarded changes to dbs and roles",
|
||||
NULL,
|
||||
&ConsoleURL,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"neon.forward_ddl",
|
||||
"Controls whether to forward DDL to the control plane",
|
||||
NULL,
|
||||
&ForwardDDL,
|
||||
true,
|
||||
PGC_SUSET,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
const char *jwt_token = getenv("NEON_CONTROL_PLANE_TOKEN");
|
||||
|
||||
if (!jwt_token)
|
||||
{
|
||||
elog(LOG, "Missing NEON_CONTROL_PLANE_TOKEN environment variable, forwarding will not be authenticated");
|
||||
}
|
||||
|
||||
if (curl_global_init(CURL_GLOBAL_DEFAULT))
|
||||
{
|
||||
elog(ERROR, "Failed to initialize curl");
|
||||
}
|
||||
if ((CurlHandle = curl_easy_init()) == NULL)
|
||||
{
|
||||
elog(ERROR, "Failed to initialize curl handle");
|
||||
}
|
||||
if ((ContentHeader = curl_slist_append(ContentHeader, "Content-Type: application/json")) == NULL)
|
||||
{
|
||||
elog(ERROR, "Failed to initialize content header");
|
||||
}
|
||||
|
||||
if (jwt_token)
|
||||
{
|
||||
char auth_header[8192];
|
||||
|
||||
snprintf(auth_header, sizeof(auth_header), "Authorization: Bearer %s", jwt_token);
|
||||
if ((ContentHeader = curl_slist_append(ContentHeader, auth_header)) == NULL)
|
||||
{
|
||||
elog(ERROR, "Failed to initialize authorization header");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
#ifndef CONTROL_PLANE_CONNECTOR_H
|
||||
#define CONTROL_PLANE_CONNECTOR_H
|
||||
|
||||
void InitControlPlaneConnector();
|
||||
|
||||
#endif
|
||||
@@ -25,7 +25,6 @@
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "control_plane_connector.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
void _PG_init(void);
|
||||
@@ -35,11 +34,7 @@ _PG_init(void)
|
||||
{
|
||||
pg_init_libpagestore();
|
||||
pg_init_walproposer();
|
||||
InitControlPlaneConnector();
|
||||
|
||||
// Important: This must happen after other parts of the extension
|
||||
// are loaded, otherwise any settings to GUCs that were set before
|
||||
// the extension was loaded will be removed.
|
||||
EmitWarningsOnPlaceholders("neon");
|
||||
}
|
||||
|
||||
|
||||
@@ -19,10 +19,8 @@ use tokio::task::JoinHandle;
|
||||
use tokio::{runtime, time::sleep};
|
||||
use tracing::*;
|
||||
|
||||
use crate::metrics::BROKER_ITERATION_TIMELINES;
|
||||
use crate::metrics::BROKER_PULLED_UPDATES;
|
||||
use crate::metrics::BROKER_PUSHED_UPDATES;
|
||||
use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
|
||||
use crate::GlobalTimelines;
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
@@ -63,14 +61,8 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
BROKER_PUSHED_UPDATES.inc();
|
||||
}
|
||||
let elapsed = now.elapsed();
|
||||
|
||||
BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
|
||||
BROKER_ITERATION_TIMELINES.observe(active_tlis.len() as f64);
|
||||
|
||||
if elapsed > push_interval / 2 {
|
||||
info!("broker push is too long, pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
|
||||
}
|
||||
|
||||
// Log duration every second. Should be about 10MB of logs per day.
|
||||
info!("pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
|
||||
sleep(push_interval).await;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -125,25 +125,6 @@ pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register safekeeper_backup_errors_total counter")
|
||||
});
|
||||
pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"safekeeper_broker_push_update_seconds",
|
||||
"Seconds to push all timeline updates to the broker",
|
||||
DISK_WRITE_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
|
||||
});
|
||||
pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
|
||||
1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
|
||||
];
|
||||
pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"safekeeper_broker_iteration_timelines",
|
||||
"Count of timelines pushed to the broker in a single iteration",
|
||||
TIMELINES_COUNT_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
|
||||
});
|
||||
|
||||
pub const LABEL_UNKNOWN: &str = "unknown";
|
||||
|
||||
|
||||
@@ -156,9 +156,7 @@ class LLVM:
|
||||
profdata: Path,
|
||||
objects: List[str],
|
||||
sources: List[str],
|
||||
demangler: Optional[Path] = None,
|
||||
output_file: Optional[Path] = None,
|
||||
) -> None:
|
||||
demangler: Optional[Path] = None) -> None:
|
||||
|
||||
cwd = self.cargo.cwd
|
||||
objects = list(intersperse('-object', objects))
|
||||
@@ -182,18 +180,14 @@ class LLVM:
|
||||
*objects,
|
||||
*sources,
|
||||
]
|
||||
if output_file is not None:
|
||||
with output_file.open('w') as outfile:
|
||||
subprocess.check_call(cmd, cwd=cwd, stdout=outfile)
|
||||
else:
|
||||
subprocess.check_call(cmd, cwd=cwd)
|
||||
subprocess.check_call(cmd, cwd=cwd)
|
||||
|
||||
def cov_report(self, **kwargs) -> None:
|
||||
self._cov(subcommand='report', **kwargs)
|
||||
|
||||
def cov_export(self, *, kind: str, output_file: Optional[Path], **kwargs) -> None:
|
||||
def cov_export(self, *, kind: str, **kwargs) -> None:
|
||||
extras = (f'-format={kind}', )
|
||||
self._cov(subcommand='export', *extras, output_file=output_file, **kwargs)
|
||||
self._cov(subcommand='export', *extras, **kwargs)
|
||||
|
||||
def cov_show(self, *, kind: str, output_dir: Optional[Path] = None, **kwargs) -> None:
|
||||
extras = [f'-format={kind}']
|
||||
@@ -289,12 +283,9 @@ class TextReport(Report):
|
||||
self.llvm.cov_show(kind='text', **self._common_kwargs())
|
||||
|
||||
|
||||
@dataclass
|
||||
class LcovReport(Report):
|
||||
output_file: Path
|
||||
|
||||
def generate(self) -> None:
|
||||
self.llvm.cov_export(kind='lcov', output_file=self.output_file, **self._common_kwargs())
|
||||
self.llvm.cov_export(kind='lcov', **self._common_kwargs())
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -484,7 +475,7 @@ class State:
|
||||
'text':
|
||||
lambda: TextReport(**params),
|
||||
'lcov':
|
||||
lambda: LcovReport(**params, output_file=self.report_dir / 'lcov.info'),
|
||||
lambda: LcovReport(**params),
|
||||
'summary':
|
||||
lambda: SummaryReport(**params),
|
||||
'github':
|
||||
|
||||
@@ -535,8 +535,8 @@ def export_timeline(
|
||||
|
||||
|
||||
def main(args: argparse.Namespace):
|
||||
# any psql version will do here. use current DEFAULT_PG_VERSION = 15
|
||||
psql_path = str(Path(args.pg_distrib_dir) / "v15" / "bin" / "psql")
|
||||
# any psql version will do here. use current DEFAULT_PG_VERSION = 14
|
||||
psql_path = str(Path(args.pg_distrib_dir) / "v14" / "bin" / "psql")
|
||||
|
||||
old_pageserver_host = args.old_pageserver_host
|
||||
new_pageserver_host = args.new_pageserver_host
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
//
|
||||
// The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch.
|
||||
// The script parses Allure reports and posts a comment with a summary of the test results to the PR.
|
||||
//
|
||||
// The comment is updated on each run with the latest results.
|
||||
//
|
||||
@@ -7,7 +7,7 @@
|
||||
// - uses: actions/github-script@v6
|
||||
// with:
|
||||
// script: |
|
||||
// const script = require("./scripts/comment-test-report.js")
|
||||
// const script = require("./scripts/pr-comment-test-report.js")
|
||||
// await script({
|
||||
// github,
|
||||
// context,
|
||||
@@ -35,12 +35,8 @@ class DefaultMap extends Map {
|
||||
module.exports = async ({ github, context, fetch, report }) => {
|
||||
// Marker to find the comment in the subsequent runs
|
||||
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
|
||||
// If we run the script in the PR or in the branch (main/release/...)
|
||||
const isPullRequest = !!context.payload.pull_request
|
||||
// Latest commit in PR or in the branch
|
||||
const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha
|
||||
// Let users know that the comment is updated automatically
|
||||
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${commitSha} at ${new Date().toISOString()} :recycle:</sub></div>`
|
||||
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${context.payload.pull_request.head.sha} at ${new Date().toISOString()} :recycle:</sub></div>`
|
||||
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
|
||||
const githubActionsBotId = 41898282
|
||||
// Commend body itself
|
||||
@@ -170,39 +166,22 @@ module.exports = async ({ github, context, fetch, report }) => {
|
||||
|
||||
commentBody += autoupdateNotice
|
||||
|
||||
let createCommentFn, listCommentsFn, updateCommentFn, issueNumberOrSha
|
||||
if (isPullRequest) {
|
||||
createCommentFn = github.rest.issues.createComment
|
||||
listCommentsFn = github.rest.issues.listComments
|
||||
updateCommentFn = github.rest.issues.updateComment
|
||||
issueNumberOrSha = {
|
||||
issue_number: context.payload.number,
|
||||
}
|
||||
} else {
|
||||
updateCommentFn = github.rest.repos.updateCommitComment
|
||||
listCommentsFn = github.rest.repos.listCommentsForCommit
|
||||
createCommentFn = github.rest.repos.createCommitComment
|
||||
issueNumberOrSha = {
|
||||
commit_sha: commitSha,
|
||||
}
|
||||
}
|
||||
|
||||
const { data: comments } = await listCommentsFn({
|
||||
...issueNumberOrSha,
|
||||
const { data: comments } = await github.rest.issues.listComments({
|
||||
issue_number: context.payload.number,
|
||||
...ownerRepoParams,
|
||||
})
|
||||
|
||||
const comment = comments.find(comment => comment.user.id === githubActionsBotId && comment.body.startsWith(startMarker))
|
||||
if (comment) {
|
||||
await updateCommentFn({
|
||||
await github.rest.issues.updateComment({
|
||||
comment_id: comment.id,
|
||||
body: commentBody,
|
||||
...ownerRepoParams,
|
||||
})
|
||||
} else {
|
||||
await createCommentFn({
|
||||
await github.rest.issues.createComment({
|
||||
issue_number: context.payload.number,
|
||||
body: commentBody,
|
||||
...issueNumberOrSha,
|
||||
...ownerRepoParams,
|
||||
})
|
||||
}
|
||||
@@ -40,9 +40,6 @@ pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
||||
// Create connection object configured to run TLS if schema starts with https://
|
||||
// and plain text otherwise. Connection is lazy, only endpoint sanity is
|
||||
// validated here.
|
||||
//
|
||||
// NB: this function is not async, but still must be run on a tokio runtime thread
|
||||
// because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
|
||||
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
|
||||
where
|
||||
U: std::convert::TryInto<Uri>,
|
||||
|
||||
@@ -1603,6 +1603,8 @@ class NeonPageserver(PgProtocol):
|
||||
# https://github.com/neondatabase/neon/issues/2442
|
||||
".*could not remove ephemeral file.*No such file or directory.*",
|
||||
# FIXME: These need investigation
|
||||
".*gc_loop.*Failed to get a tenant .* Tenant .* not found.*",
|
||||
".*compaction_loop.*Failed to get a tenant .* Tenant .* not found.*",
|
||||
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
|
||||
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
|
||||
".*Removing intermediate uninit mark file.*",
|
||||
@@ -1619,10 +1621,6 @@ class NeonPageserver(PgProtocol):
|
||||
".*task iteration took longer than the configured period.*",
|
||||
# this is until #3501
|
||||
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
|
||||
# these can happen anytime we do compactions from background task and shutdown pageserver
|
||||
r".*ERROR.*ancestor timeline \S+ is being stopped",
|
||||
# this is expected given our collaborative shutdown approach for the UploadQueue
|
||||
".*Compaction failed, retrying in .*: queue is in state Stopped.*",
|
||||
]
|
||||
|
||||
def start(
|
||||
|
||||
@@ -155,14 +155,14 @@ class PageserverHttpClient(requests.Session):
|
||||
return res_json
|
||||
|
||||
def tenant_create(
|
||||
self, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None
|
||||
self, new_tenant_id: Optional[TenantId] = None, conf: Optional[Dict[str, Any]] = None
|
||||
) -> TenantId:
|
||||
if conf is not None:
|
||||
assert "new_tenant_id" not in conf.keys()
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant",
|
||||
json={
|
||||
"new_tenant_id": str(new_tenant_id),
|
||||
"new_tenant_id": str(new_tenant_id) if new_tenant_id else None,
|
||||
**(conf or {}),
|
||||
},
|
||||
)
|
||||
@@ -293,13 +293,13 @@ class PageserverHttpClient(requests.Session):
|
||||
self,
|
||||
pg_version: PgVersion,
|
||||
tenant_id: TenantId,
|
||||
new_timeline_id: TimelineId,
|
||||
new_timeline_id: Optional[TimelineId] = None,
|
||||
ancestor_timeline_id: Optional[TimelineId] = None,
|
||||
ancestor_start_lsn: Optional[Lsn] = None,
|
||||
**kwargs,
|
||||
) -> Dict[Any, Any]:
|
||||
body: Dict[str, Any] = {
|
||||
"new_timeline_id": str(new_timeline_id),
|
||||
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
|
||||
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
|
||||
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
|
||||
}
|
||||
|
||||
@@ -27,10 +27,6 @@ class PgVersion(str, enum.Enum):
|
||||
def __repr__(self) -> str:
|
||||
return f"'{self.value}'"
|
||||
|
||||
# Make this explicit for Python 3.11 compatibility, which changes the behavior of enums
|
||||
def __str__(self) -> str:
|
||||
return self.value
|
||||
|
||||
# In GitHub workflows we use Postgres version with v-prefix (e.g. v14 instead of just 14),
|
||||
# sometime we need to do so in tests.
|
||||
@property
|
||||
@@ -82,11 +78,11 @@ def pytest_addoption(parser: Parser):
|
||||
@pytest.fixture(scope="session")
|
||||
def pg_version(request: FixtureRequest) -> Iterator[PgVersion]:
|
||||
if v := request.config.getoption("--pg-version"):
|
||||
version, source = v, "from --pg-version command-line argument"
|
||||
version, source = v, "from --pg-version commad-line argument"
|
||||
elif v := os.environ.get("DEFAULT_PG_VERSION"):
|
||||
version, source = PgVersion(v), "from DEFAULT_PG_VERSION environment variable"
|
||||
else:
|
||||
version, source = DEFAULT_VERSION, "default version"
|
||||
version, source = DEFAULT_VERSION, "default verson"
|
||||
|
||||
log.info(f"pg_version is {version} ({source})")
|
||||
yield version
|
||||
|
||||
@@ -3,7 +3,7 @@ from contextlib import closing
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.types import TenantId
|
||||
|
||||
|
||||
def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -25,19 +25,21 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
|
||||
ps.safe_psql("set FOO", password=tenant_token)
|
||||
ps.safe_psql("set FOO", password=pageserver_token)
|
||||
|
||||
new_timeline_id = env.neon_cli.create_branch(
|
||||
"test_pageserver_auth", tenant_id=env.initial_tenant
|
||||
)
|
||||
|
||||
# tenant can create branches
|
||||
tenant_http_client.timeline_create(
|
||||
pg_version=env.pg_version,
|
||||
tenant_id=env.initial_tenant,
|
||||
new_timeline_id=TimelineId.generate(),
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
ancestor_timeline_id=new_timeline_id,
|
||||
)
|
||||
# console can create branches for tenant
|
||||
pageserver_http_client.timeline_create(
|
||||
pg_version=env.pg_version,
|
||||
tenant_id=env.initial_tenant,
|
||||
new_timeline_id=TimelineId.generate(),
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
ancestor_timeline_id=new_timeline_id,
|
||||
)
|
||||
|
||||
# fail to create branch using token with different tenant_id
|
||||
@@ -47,19 +49,18 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
|
||||
invalid_tenant_http_client.timeline_create(
|
||||
pg_version=env.pg_version,
|
||||
tenant_id=env.initial_tenant,
|
||||
new_timeline_id=TimelineId.generate(),
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
ancestor_timeline_id=new_timeline_id,
|
||||
)
|
||||
|
||||
# create tenant using management token
|
||||
pageserver_http_client.tenant_create(TenantId.generate())
|
||||
pageserver_http_client.tenant_create()
|
||||
|
||||
# fail to create tenant using tenant token
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="Forbidden: Attempt to access management api with tenant scope. Permission denied",
|
||||
):
|
||||
tenant_http_client.tenant_create(TenantId.generate())
|
||||
tenant_http_client.tenant_create()
|
||||
|
||||
|
||||
def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
".*is not active. Current state: Broken.*",
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*load failed.*load local timeline.*",
|
||||
".*could not load tenant.*load local timeline.*",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -1,210 +0,0 @@
|
||||
from types import TracebackType
|
||||
from typing import Any, Dict, List, Optional, Tuple, Type
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import VanillaPostgres
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
def handle_db(dbs, roles, operation):
|
||||
if operation["op"] == "set":
|
||||
if "old_name" in operation and operation["old_name"] in dbs:
|
||||
dbs[operation["name"]] = dbs[operation["old_name"]]
|
||||
dbs.pop(operation["old_name"])
|
||||
if "owner" in operation:
|
||||
dbs[operation["name"]] = operation["owner"]
|
||||
elif operation["op"] == "del":
|
||||
dbs.pop(operation["name"])
|
||||
else:
|
||||
raise ValueError("Invalid op")
|
||||
|
||||
|
||||
def handle_role(dbs, roles, operation):
|
||||
if operation["op"] == "set":
|
||||
if "old_name" in operation and operation["old_name"] in roles:
|
||||
roles[operation["name"]] = roles[operation["old_name"]]
|
||||
roles.pop(operation["old_name"])
|
||||
for db, owner in dbs.items():
|
||||
if owner == operation["old_name"]:
|
||||
dbs[db] = operation["name"]
|
||||
if "password" in operation:
|
||||
roles[operation["name"]] = operation["password"]
|
||||
elif operation["op"] == "del":
|
||||
if "old_name" in operation:
|
||||
roles.pop(operation["old_name"])
|
||||
roles.pop(operation["name"])
|
||||
else:
|
||||
raise ValueError("Invalid op")
|
||||
|
||||
|
||||
fail = False
|
||||
|
||||
|
||||
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
|
||||
log.info(f"Received request with data {request.get_data(as_text=True)}")
|
||||
if fail:
|
||||
log.info("FAILING")
|
||||
return Response(status=500, response="Failed just cuz")
|
||||
if request.json is None:
|
||||
log.info("Received invalid JSON")
|
||||
return Response(status=400)
|
||||
json = request.json
|
||||
# Handle roles first
|
||||
if "roles" in json:
|
||||
for operation in json["roles"]:
|
||||
handle_role(dbs, roles, operation)
|
||||
if "dbs" in json:
|
||||
for operation in json["dbs"]:
|
||||
handle_db(dbs, roles, operation)
|
||||
return Response(status=200)
|
||||
|
||||
|
||||
class DdlForwardingContext:
|
||||
def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: str, port: int):
|
||||
self.server = httpserver
|
||||
self.pg = vanilla_pg
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.dbs: Dict[str, str] = {}
|
||||
self.roles: Dict[str, str] = {}
|
||||
endpoint = "/management/api/v2/roles_and_databases"
|
||||
ddl_url = f"http://{host}:{port}{endpoint}"
|
||||
self.pg.configure(
|
||||
[
|
||||
f"neon.console_url={ddl_url}",
|
||||
"shared_preload_libraries = 'neon'",
|
||||
]
|
||||
)
|
||||
log.info(f"Listening on {ddl_url}")
|
||||
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
|
||||
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
|
||||
)
|
||||
|
||||
def __enter__(self):
|
||||
self.pg.start()
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc: Optional[BaseException],
|
||||
tb: Optional[TracebackType],
|
||||
):
|
||||
self.pg.stop()
|
||||
|
||||
def send(self, query: str) -> List[Tuple[Any, ...]]:
|
||||
return self.pg.safe_psql(query)
|
||||
|
||||
def wait(self, timeout=3):
|
||||
self.server.wait(timeout=timeout)
|
||||
|
||||
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
|
||||
res = self.send(query)
|
||||
self.wait(timeout=timeout)
|
||||
return res
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def ddl(
|
||||
httpserver: HTTPServer, vanilla_pg: VanillaPostgres, httpserver_listen_address: tuple[str, int]
|
||||
):
|
||||
(host, port) = httpserver_listen_address
|
||||
with DdlForwardingContext(httpserver, vanilla_pg, host, port) as ddl:
|
||||
yield ddl
|
||||
|
||||
|
||||
def test_ddl_forwarding(ddl: DdlForwardingContext):
|
||||
curr_user = ddl.send("SELECT current_user")[0][0]
|
||||
log.info(f"Current user is {curr_user}")
|
||||
ddl.send_and_wait("CREATE DATABASE bork")
|
||||
assert ddl.dbs == {"bork": curr_user}
|
||||
ddl.send_and_wait("CREATE ROLE volk WITH PASSWORD 'nu_zayats'")
|
||||
ddl.send_and_wait("ALTER DATABASE bork RENAME TO nu_pogodi")
|
||||
assert ddl.dbs == {"nu_pogodi": curr_user}
|
||||
ddl.send_and_wait("ALTER DATABASE nu_pogodi OWNER TO volk")
|
||||
assert ddl.dbs == {"nu_pogodi": "volk"}
|
||||
ddl.send_and_wait("DROP DATABASE nu_pogodi")
|
||||
assert ddl.dbs == {}
|
||||
ddl.send_and_wait("DROP ROLE volk")
|
||||
assert ddl.roles == {}
|
||||
|
||||
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
|
||||
assert ddl.roles == {"tarzan": "of_the_apes"}
|
||||
ddl.send_and_wait("DROP ROLE tarzan")
|
||||
assert ddl.roles == {}
|
||||
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
|
||||
assert ddl.roles == {"tarzan": "of_the_apes"}
|
||||
ddl.send_and_wait("ALTER ROLE tarzan WITH PASSWORD 'jungle_man'")
|
||||
assert ddl.roles == {"tarzan": "jungle_man"}
|
||||
ddl.send_and_wait("ALTER ROLE tarzan RENAME TO mowgli")
|
||||
assert ddl.roles == {"mowgli": "jungle_man"}
|
||||
ddl.send_and_wait("DROP ROLE mowgli")
|
||||
assert ddl.roles == {}
|
||||
|
||||
conn = ddl.pg.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("CREATE ROLE bork WITH PASSWORD 'cork'")
|
||||
cur.execute("COMMIT")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {"bork": "cork"}
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("CREATE ROLE stork WITH PASSWORD 'pork'")
|
||||
cur.execute("ABORT")
|
||||
ddl.wait()
|
||||
assert ("stork", "pork") not in ddl.roles.items()
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("ALTER ROLE bork WITH PASSWORD 'pork'")
|
||||
cur.execute("ALTER ROLE bork RENAME TO stork")
|
||||
cur.execute("COMMIT")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {"stork": "pork"}
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("CREATE ROLE dork WITH PASSWORD 'york'")
|
||||
cur.execute("SAVEPOINT point")
|
||||
cur.execute("ALTER ROLE dork WITH PASSWORD 'zork'")
|
||||
cur.execute("ALTER ROLE dork RENAME TO fork")
|
||||
cur.execute("ROLLBACK TO SAVEPOINT point")
|
||||
cur.execute("ALTER ROLE dork WITH PASSWORD 'fork'")
|
||||
cur.execute("ALTER ROLE dork RENAME TO zork")
|
||||
cur.execute("RELEASE SAVEPOINT point")
|
||||
cur.execute("COMMIT")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {"stork": "pork", "zork": "fork"}
|
||||
|
||||
cur.execute("DROP ROLE stork")
|
||||
cur.execute("DROP ROLE zork")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {}
|
||||
|
||||
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
|
||||
cur.execute("CREATE ROLE stork WITH PASSWORD 'cork'")
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("DROP ROLE bork")
|
||||
cur.execute("ALTER ROLE stork RENAME TO bork")
|
||||
cur.execute("COMMIT")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {"bork": "cork"}
|
||||
|
||||
cur.execute("DROP ROLE bork")
|
||||
ddl.wait()
|
||||
assert ddl.roles == {}
|
||||
|
||||
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
|
||||
cur.execute("CREATE DATABASE stork WITH OWNER=bork")
|
||||
cur.execute("ALTER ROLE bork RENAME TO cork")
|
||||
ddl.wait()
|
||||
assert ddl.dbs == {"stork": "cork"}
|
||||
|
||||
with pytest.raises(psycopg2.InternalError):
|
||||
global fail
|
||||
fail = True
|
||||
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
|
||||
ddl.wait()
|
||||
|
||||
conn.close()
|
||||
@@ -228,6 +228,7 @@ def proxy_with_metric_collector(
|
||||
@pytest.mark.asyncio
|
||||
async def test_proxy_metric_collection(
|
||||
httpserver: HTTPServer,
|
||||
httpserver_listen_address,
|
||||
proxy_with_metric_collector: NeonProxy,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
):
|
||||
|
||||
@@ -140,7 +140,7 @@ def test_remote_storage_backup_and_restore(
|
||||
# This is before the failures injected by test_remote_failures, so it's a permanent error.
|
||||
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*attach failed.*: storage-sync-list-remote-timelines",
|
||||
".*error attaching tenant: storage-sync-list-remote-timelines",
|
||||
)
|
||||
# Attach it. This HTTP request will succeed and launch a
|
||||
# background task to load the tenant. In that background task,
|
||||
|
||||
@@ -647,9 +647,7 @@ def test_ignored_tenant_stays_broken_without_metadata(
|
||||
metadata_removed = True
|
||||
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
|
||||
|
||||
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
|
||||
pageserver_http.tenant_load(tenant_id=tenant_id)
|
||||
|
||||
@@ -22,7 +22,6 @@ from fixtures.neon_fixtures import (
|
||||
available_remote_storages,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
|
||||
@@ -309,26 +308,27 @@ def test_pageserver_with_empty_tenants(
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(".*load failed.*list timelines directory.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*could not load tenant.*Failed to list timelines directory.*"
|
||||
)
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_with_empty_timelines = TenantId.generate()
|
||||
client.tenant_create(tenant_with_empty_timelines)
|
||||
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
|
||||
tenant_with_empty_timelines_dir = client.tenant_create()
|
||||
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
|
||||
for temp_timeline in temp_timelines:
|
||||
client.timeline_delete(
|
||||
tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
|
||||
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
|
||||
)
|
||||
files_in_timelines_dir = sum(
|
||||
1
|
||||
for _p in Path.iterdir(
|
||||
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines) / "timelines"
|
||||
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
|
||||
)
|
||||
)
|
||||
assert (
|
||||
files_in_timelines_dir == 0
|
||||
), f"Tenant {tenant_with_empty_timelines} should have an empty timelines/ directory"
|
||||
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
|
||||
|
||||
# Trigger timeline re-initialization after pageserver restart
|
||||
env.endpoints.stop_all()
|
||||
@@ -340,16 +340,10 @@ def test_pageserver_with_empty_tenants(
|
||||
env.pageserver.start()
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
def not_loading():
|
||||
tenants = client.tenant_list()
|
||||
assert len(tenants) == 2
|
||||
assert all(t["state"]["slug"] != "Loading" for t in tenants)
|
||||
|
||||
wait_until(10, 0.2, not_loading)
|
||||
|
||||
tenants = client.tenant_list()
|
||||
|
||||
assert len(tenants) == 2
|
||||
|
||||
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
|
||||
assert (
|
||||
broken_tenant["state"]["slug"] == "Broken"
|
||||
@@ -360,17 +354,17 @@ def test_pageserver_with_empty_tenants(
|
||||
broken_tenant_status["state"]["slug"] == "Broken"
|
||||
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
|
||||
|
||||
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
|
||||
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
|
||||
|
||||
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
|
||||
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines_dir)]
|
||||
assert (
|
||||
loaded_tenant["state"]["slug"] == "Active"
|
||||
), "Tenant {tenant_with_empty_timelines} with empty timelines dir should be active and ready for timeline creation"
|
||||
), "Tenant {tenant_with_empty_timelines_dir} with empty timelines dir should be active and ready for timeline creation"
|
||||
|
||||
loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines)
|
||||
loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines_dir)
|
||||
assert (
|
||||
loaded_tenant_status["state"]["slug"] == "Active"
|
||||
), f"Tenant {tenant_with_empty_timelines} without timelines dir should be active"
|
||||
), f"Tenant {tenant_with_empty_timelines_dir} without timelines dir should be active"
|
||||
|
||||
time.sleep(1) # to allow metrics propagation
|
||||
|
||||
@@ -380,7 +374,7 @@ def test_pageserver_with_empty_tenants(
|
||||
"state": "Broken",
|
||||
}
|
||||
active_tenants_metric_filter = {
|
||||
"tenant_id": str(tenant_with_empty_timelines),
|
||||
"tenant_id": str(tenant_with_empty_timelines_dir),
|
||||
"state": "Active",
|
||||
}
|
||||
|
||||
@@ -392,7 +386,7 @@ def test_pageserver_with_empty_tenants(
|
||||
|
||||
assert (
|
||||
tenant_active_count == 1
|
||||
), f"Tenant {tenant_with_empty_timelines} should have metric as active"
|
||||
), f"Tenant {tenant_with_empty_timelines_dir} should have metric as active"
|
||||
|
||||
tenant_broken_count = int(
|
||||
ps_metrics.query_one(
|
||||
|
||||
@@ -371,7 +371,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
|
||||
# make the second call and assert behavior
|
||||
log.info("second call start")
|
||||
error_msg_re = "timeline deletion is already in progress"
|
||||
error_msg_re = "another task is already setting the deleted_flag, started at"
|
||||
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
|
||||
assert second_call_err.value.status_code == 500
|
||||
|
||||
@@ -27,6 +27,7 @@ futures-core = { version = "0.3" }
|
||||
futures-executor = { version = "0.3" }
|
||||
futures-sink = { version = "0.3" }
|
||||
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
|
||||
hashbrown = { version = "0.12", features = ["raw"] }
|
||||
itertools = { version = "0.10" }
|
||||
libc = { version = "0.2", features = ["extra_traits"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
@@ -38,7 +39,7 @@ num-traits = { version = "0.2", features = ["i128"] }
|
||||
prost = { version = "0.11" }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
regex = { version = "1" }
|
||||
regex-syntax = { version = "0.7" }
|
||||
regex-syntax = { version = "0.6" }
|
||||
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "multipart", "rustls-tls"] }
|
||||
ring = { version = "0.16", features = ["std"] }
|
||||
rustls = { version = "0.20", features = ["dangerous_configuration"] }
|
||||
@@ -61,6 +62,7 @@ url = { version = "2", features = ["serde"] }
|
||||
anyhow = { version = "1", features = ["backtrace"] }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
either = { version = "1" }
|
||||
hashbrown = { version = "0.12", features = ["raw"] }
|
||||
itertools = { version = "0.10" }
|
||||
libc = { version = "0.2", features = ["extra_traits"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
@@ -68,7 +70,7 @@ memchr = { version = "2" }
|
||||
nom = { version = "7" }
|
||||
prost = { version = "0.11" }
|
||||
regex = { version = "1" }
|
||||
regex-syntax = { version = "0.7" }
|
||||
regex-syntax = { version = "0.6" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
|
||||
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit-mut"] }
|
||||
|
||||
Reference in New Issue
Block a user