mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
Merge branch 'refs/heads/main' into amasterov/add-postgis-test-2
# Conflicts: # .github/actions/neon-project-create/action.yml
This commit is contained in:
10
.github/actions/neon-project-create/action.yml
vendored
10
.github/actions/neon-project-create/action.yml
vendored
@@ -140,20 +140,14 @@ runs:
|
||||
-d "{\"scheduling\": \"Essential\"}"
|
||||
fi
|
||||
# XXX
|
||||
# This is a workaround for project's settings which don't work well in public API now
|
||||
# https://github.com/neondatabase/cloud/issues/27143
|
||||
# This is a workaround for the default endpoint settings, which currently do not allow some settings in the public API.
|
||||
# https://github.com/neondatabase/cloud/issues/27108
|
||||
if ( [[ -n "${PROJECT_SETTINGS}" ]] && [[ "${PROJECT_SETTINGS}" != "{}" ]] ) || ( [[ -n "${DEFAULT_ENDPOINT_SETTINGS}" ]] && [[ "${DEFAULT_ENDPOINT_SETTINGS}" != "{}" ]] ); then
|
||||
if [[ -n ${DEFAULT_ENDPOINT_SETTINGS} && ${DEFAULT_ENDPOINT_SETTINGS} != "{}" ]] ; then
|
||||
PROJECT_DATA=$(curl -X GET \
|
||||
"https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/projects/${project_id}" \
|
||||
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
|
||||
-d "{\"scheduling\": \"Essential\"}"
|
||||
)
|
||||
NEW_PROJECT_SETTINGS=$(echo ${PROJECT_DATA} | jq -rc ".project.settings + ${PROJECT_SETTINGS}")
|
||||
curl -X POST --fail \
|
||||
"https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/projects/${project_id}/settings" \
|
||||
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
|
||||
--data "${NEW_PROJECT_SETTINGS}"
|
||||
NEW_DEFAULT_ENDPOINT_SETTINGS=$(echo ${PROJECT_DATA} | jq -rc ".project.default_endpoint_settings + ${DEFAULT_ENDPOINT_SETTINGS}")
|
||||
curl -X POST --fail \
|
||||
"https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/projects/${project_id}/default_endpoint_settings" \
|
||||
|
||||
15
.github/workflows/_build-and-test-locally.yml
vendored
15
.github/workflows/_build-and-test-locally.yml
vendored
@@ -28,6 +28,16 @@ on:
|
||||
required: false
|
||||
default: 'disabled'
|
||||
type: string
|
||||
test-selection:
|
||||
description: 'specification of selected test(s) to run'
|
||||
required: false
|
||||
default: ''
|
||||
type: string
|
||||
test-run-count:
|
||||
description: 'number of runs to perform for selected tests'
|
||||
required: false
|
||||
default: 1
|
||||
type: number
|
||||
|
||||
defaults:
|
||||
run:
|
||||
@@ -381,14 +391,15 @@ jobs:
|
||||
run_with_real_s3: true
|
||||
real_s3_bucket: neon-github-ci-tests
|
||||
real_s3_region: eu-central-1
|
||||
rerun_failed: true
|
||||
rerun_failed: ${{ inputs.test-run-count == 1 }}
|
||||
pg_version: ${{ matrix.pg_version }}
|
||||
sanitizers: ${{ inputs.sanitizers }}
|
||||
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
|
||||
# Attempt to stop tests gracefully to generate test reports
|
||||
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
|
||||
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }}
|
||||
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
|
||||
${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }}
|
||||
env:
|
||||
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
|
||||
|
||||
120
.github/workflows/build_and_run_selected_test.yml
vendored
Normal file
120
.github/workflows/build_and_run_selected_test.yml
vendored
Normal file
@@ -0,0 +1,120 @@
|
||||
name: Build and Run Selected Test
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
test-selection:
|
||||
description: 'Specification of selected test(s), as accepted by pytest -k'
|
||||
required: true
|
||||
type: string
|
||||
run-count:
|
||||
description: 'Number of test runs to perform'
|
||||
required: true
|
||||
type: number
|
||||
archs:
|
||||
description: 'Archs to run tests on, e. g.: ["x64", "arm64"]'
|
||||
default: '["x64"]'
|
||||
required: true
|
||||
type: string
|
||||
build-types:
|
||||
description: 'Build types to run tests on, e. g.: ["debug", "release"]'
|
||||
default: '["release"]'
|
||||
required: true
|
||||
type: string
|
||||
pg-versions:
|
||||
description: 'Postgres versions to use for testing, e.g,: [{"pg_version":"v16"}, {"pg_version":"v17"}])'
|
||||
default: '[{"pg_version":"v17"}]'
|
||||
required: true
|
||||
type: string
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
COPT: '-Werror'
|
||||
|
||||
jobs:
|
||||
meta:
|
||||
uses: ./.github/workflows/_meta.yml
|
||||
with:
|
||||
github-event-name: ${{ github.event_name }}
|
||||
github-event-json: ${{ toJSON(github.event) }}
|
||||
|
||||
build-and-test-locally:
|
||||
needs: [ meta ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
arch: ${{ fromJson(inputs.archs) }}
|
||||
build-type: ${{ fromJson(inputs.build-types) }}
|
||||
uses: ./.github/workflows/_build-and-test-locally.yml
|
||||
with:
|
||||
arch: ${{ matrix.arch }}
|
||||
build-tools-image: ghcr.io/neondatabase/build-tools:pinned-bookworm
|
||||
build-tag: ${{ needs.meta.outputs.build-tag }}
|
||||
build-type: ${{ matrix.build-type }}
|
||||
test-cfg: ${{ inputs.pg-versions }}
|
||||
test-selection: ${{ inputs.test-selection }}
|
||||
test-run-count: ${{ fromJson(inputs.run-count) }}
|
||||
secrets: inherit
|
||||
|
||||
create-test-report:
|
||||
needs: [ build-and-test-locally ]
|
||||
if: ${{ !cancelled() }}
|
||||
permissions:
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
statuses: write
|
||||
contents: write
|
||||
pull-requests: write
|
||||
outputs:
|
||||
report-url: ${{ steps.create-allure-report.outputs.report-url }}
|
||||
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
|
||||
credentials:
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Create Allure report
|
||||
if: ${{ !cancelled() }}
|
||||
id: create-allure-report
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
with:
|
||||
store-test-results-into-db: true
|
||||
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
env:
|
||||
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_DEV }}
|
||||
|
||||
- uses: actions/github-script@v7
|
||||
if: ${{ !cancelled() }}
|
||||
with:
|
||||
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
|
||||
retries: 5
|
||||
script: |
|
||||
const report = {
|
||||
reportUrl: "${{ steps.create-allure-report.outputs.report-url }}",
|
||||
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
|
||||
}
|
||||
|
||||
const coverage = {}
|
||||
|
||||
const script = require("./scripts/comment-test-report.js")
|
||||
await script({
|
||||
github,
|
||||
context,
|
||||
fetch,
|
||||
report,
|
||||
coverage,
|
||||
})
|
||||
2
.github/workflows/check-permissions.yml
vendored
2
.github/workflows/check-permissions.yml
vendored
@@ -19,7 +19,7 @@ jobs:
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@v2
|
||||
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ jobs:
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@v2
|
||||
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
|
||||
2
.github/workflows/fast-forward.yml
vendored
2
.github/workflows/fast-forward.yml
vendored
@@ -14,7 +14,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@v2
|
||||
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@v2
|
||||
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
@@ -75,7 +75,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@v2
|
||||
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
|
||||
2
.github/workflows/pin-build-tools-image.yml
vendored
2
.github/workflows/pin-build-tools-image.yml
vendored
@@ -41,7 +41,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@v2
|
||||
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
|
||||
4
.github/workflows/trigger-e2e-tests.yml
vendored
4
.github/workflows/trigger-e2e-tests.yml
vendored
@@ -35,7 +35,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@v2
|
||||
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
@@ -73,7 +73,7 @@ jobs:
|
||||
}}
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@v2
|
||||
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
|
||||
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -6616,12 +6616,14 @@ dependencies = [
|
||||
"anyhow",
|
||||
"async-stream",
|
||||
"bytes",
|
||||
"camino",
|
||||
"clap",
|
||||
"const_format",
|
||||
"futures",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"http-body-util",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
@@ -6631,6 +6633,7 @@ dependencies = [
|
||||
"prost 0.13.3",
|
||||
"rustls 0.23.18",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"tracing",
|
||||
|
||||
@@ -17,8 +17,10 @@ use std::time::Duration;
|
||||
use anyhow::{Context, Result, anyhow, bail};
|
||||
use clap::Parser;
|
||||
use compute_api::spec::ComputeMode;
|
||||
use control_plane::broker::StorageBroker;
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
|
||||
use control_plane::local_env;
|
||||
use control_plane::local_env::{
|
||||
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
|
||||
NeonLocalInitPageserverConf, SafekeeperConf,
|
||||
@@ -28,7 +30,6 @@ use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::storage_controller::{
|
||||
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
|
||||
};
|
||||
use control_plane::{broker, local_env};
|
||||
use nix::fcntl::{FlockArg, flock};
|
||||
use pageserver_api::config::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
@@ -988,7 +989,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
NeonLocalInitConf {
|
||||
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
|
||||
broker: NeonBroker {
|
||||
listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
|
||||
listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()),
|
||||
listen_https_addr: None,
|
||||
},
|
||||
safekeepers: vec![SafekeeperConf {
|
||||
id: DEFAULT_SAFEKEEPER_ID,
|
||||
@@ -1777,7 +1779,8 @@ async fn handle_endpoint_storage(
|
||||
async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
|
||||
match subcmd {
|
||||
StorageBrokerCmd::Start(args) => {
|
||||
if let Err(e) = broker::start_broker_process(env, &args.start_timeout).await {
|
||||
let storage_broker = StorageBroker::from_env(env);
|
||||
if let Err(e) = storage_broker.start(&args.start_timeout).await {
|
||||
eprintln!("broker start failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
@@ -1785,7 +1788,8 @@ async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::Local
|
||||
|
||||
StorageBrokerCmd::Stop(_args) => {
|
||||
// FIXME: stop_mode unused
|
||||
if let Err(e) = broker::stop_broker_process(env) {
|
||||
let storage_broker = StorageBroker::from_env(env);
|
||||
if let Err(e) = storage_broker.stop() {
|
||||
eprintln!("broker stop failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
@@ -1835,8 +1839,11 @@ async fn handle_start_all_impl(
|
||||
#[allow(clippy::redundant_closure_call)]
|
||||
(|| {
|
||||
js.spawn(async move {
|
||||
let retry_timeout = retry_timeout;
|
||||
broker::start_broker_process(env, &retry_timeout).await
|
||||
let storage_broker = StorageBroker::from_env(env);
|
||||
storage_broker
|
||||
.start(&retry_timeout)
|
||||
.await
|
||||
.map_err(|e| e.context("start storage_broker"))
|
||||
});
|
||||
|
||||
js.spawn(async move {
|
||||
@@ -1991,7 +1998,8 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = broker::stop_broker_process(env) {
|
||||
let storage_broker = StorageBroker::from_env(env);
|
||||
if let Err(e) = storage_broker.stop() {
|
||||
eprintln!("neon broker stop failed: {e:#}");
|
||||
}
|
||||
|
||||
|
||||
@@ -3,60 +3,86 @@
|
||||
//! In the local test environment, the storage broker stores its data directly in
|
||||
//!
|
||||
//! ```text
|
||||
//! .neon
|
||||
//! .neon/storage_broker
|
||||
//! ```
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
|
||||
use crate::{background_process, local_env};
|
||||
use crate::{background_process, local_env::LocalEnv};
|
||||
|
||||
pub async fn start_broker_process(
|
||||
env: &local_env::LocalEnv,
|
||||
retry_timeout: &Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
let broker = &env.broker;
|
||||
let listen_addr = &broker.listen_addr;
|
||||
|
||||
print!("Starting neon broker at {}", listen_addr);
|
||||
|
||||
let args = [format!("--listen-addr={listen_addr}")];
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
background_process::start_process(
|
||||
"storage_broker",
|
||||
&env.base_data_dir,
|
||||
&env.storage_broker_bin(),
|
||||
args,
|
||||
[],
|
||||
background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)),
|
||||
retry_timeout,
|
||||
|| async {
|
||||
let url = broker.client_url();
|
||||
let status_url = url.join("status").with_context(|| {
|
||||
format!("Failed to append /status path to broker endpoint {url}")
|
||||
})?;
|
||||
let request = client
|
||||
.get(status_url)
|
||||
.build()
|
||||
.with_context(|| format!("Failed to construct request to broker endpoint {url}"))?;
|
||||
match client.execute(request).await {
|
||||
Ok(resp) => Ok(resp.status().is_success()),
|
||||
Err(_) => Ok(false),
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context("Failed to spawn storage_broker subprocess")?;
|
||||
Ok(())
|
||||
pub struct StorageBroker {
|
||||
env: LocalEnv,
|
||||
}
|
||||
|
||||
pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
|
||||
background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env))
|
||||
}
|
||||
impl StorageBroker {
|
||||
/// Create a new `StorageBroker` instance from the environment.
|
||||
pub fn from_env(env: &LocalEnv) -> Self {
|
||||
Self { env: env.clone() }
|
||||
}
|
||||
|
||||
fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf {
|
||||
Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid"))
|
||||
.expect("non-Unicode path")
|
||||
pub fn initialize(&self) -> anyhow::Result<()> {
|
||||
if self.env.generate_local_ssl_certs {
|
||||
self.env.generate_ssl_cert(
|
||||
&self.env.storage_broker_data_dir().join("server.crt"),
|
||||
&self.env.storage_broker_data_dir().join("server.key"),
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start the storage broker process.
|
||||
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
|
||||
let broker = &self.env.broker;
|
||||
|
||||
print!("Starting neon broker at {}", broker.client_url());
|
||||
|
||||
let mut args = Vec::new();
|
||||
|
||||
if let Some(addr) = &broker.listen_addr {
|
||||
args.push(format!("--listen-addr={addr}"));
|
||||
}
|
||||
if let Some(addr) = &broker.listen_https_addr {
|
||||
args.push(format!("--listen-https-addr={addr}"));
|
||||
}
|
||||
|
||||
let client = self.env.create_http_client();
|
||||
background_process::start_process(
|
||||
"storage_broker",
|
||||
&self.env.storage_broker_data_dir(),
|
||||
&self.env.storage_broker_bin(),
|
||||
args,
|
||||
[],
|
||||
background_process::InitialPidFile::Create(self.pid_file_path()),
|
||||
retry_timeout,
|
||||
|| async {
|
||||
let url = broker.client_url();
|
||||
let status_url = url.join("status").with_context(|| {
|
||||
format!("Failed to append /status path to broker endpoint {url}")
|
||||
})?;
|
||||
let request = client.get(status_url).build().with_context(|| {
|
||||
format!("Failed to construct request to broker endpoint {url}")
|
||||
})?;
|
||||
match client.execute(request).await {
|
||||
Ok(resp) => Ok(resp.status().is_success()),
|
||||
Err(_) => Ok(false),
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context("Failed to spawn storage_broker subprocess")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stop the storage broker process.
|
||||
pub fn stop(&self) -> anyhow::Result<()> {
|
||||
background_process::stop_process(true, "storage_broker", &self.pid_file_path())
|
||||
}
|
||||
|
||||
/// Get the path to the PID file for the storage broker.
|
||||
fn pid_file_path(&self) -> Utf8PathBuf {
|
||||
Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_broker.pid"))
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//! script which will use local paths.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::{Command, Stdio};
|
||||
use std::time::Duration;
|
||||
@@ -14,11 +14,12 @@ use anyhow::{Context, bail};
|
||||
use clap::ValueEnum;
|
||||
use pem::Pem;
|
||||
use postgres_backend::AuthType;
|
||||
use reqwest::Url;
|
||||
use reqwest::{Certificate, Url};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::auth::encode_from_key_file;
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
use crate::broker::StorageBroker;
|
||||
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
|
||||
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
|
||||
use crate::safekeeper::SafekeeperNode;
|
||||
@@ -157,11 +158,16 @@ pub struct EndpointStorageConf {
|
||||
}
|
||||
|
||||
/// Broker config for cluster internal communication.
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Default)]
|
||||
#[serde(default)]
|
||||
pub struct NeonBroker {
|
||||
/// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'.
|
||||
pub listen_addr: SocketAddr,
|
||||
/// Broker listen HTTP address for storage nodes coordination, e.g. '127.0.0.1:50051'.
|
||||
/// At least one of listen_addr or listen_https_addr must be set.
|
||||
pub listen_addr: Option<SocketAddr>,
|
||||
/// Broker listen HTTPS address for storage nodes coordination, e.g. '127.0.0.1:50051'.
|
||||
/// At least one of listen_addr or listen_https_addr must be set.
|
||||
/// listen_https_addr is preferred over listen_addr in neon_local.
|
||||
pub listen_https_addr: Option<SocketAddr>,
|
||||
}
|
||||
|
||||
/// A part of storage controller's config the neon_local knows about.
|
||||
@@ -235,18 +241,19 @@ impl Default for NeonStorageControllerConf {
|
||||
}
|
||||
}
|
||||
|
||||
// Dummy Default impl to satisfy Deserialize derive.
|
||||
impl Default for NeonBroker {
|
||||
fn default() -> Self {
|
||||
NeonBroker {
|
||||
listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NeonBroker {
|
||||
pub fn client_url(&self) -> Url {
|
||||
Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url")
|
||||
let url = if let Some(addr) = self.listen_https_addr {
|
||||
format!("https://{}", addr)
|
||||
} else {
|
||||
format!(
|
||||
"http://{}",
|
||||
self.listen_addr
|
||||
.expect("at least one address should be set")
|
||||
)
|
||||
};
|
||||
|
||||
Url::parse(&url).expect("failed to construct url")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -441,6 +448,10 @@ impl LocalEnv {
|
||||
self.base_data_dir.join("endpoints")
|
||||
}
|
||||
|
||||
pub fn storage_broker_data_dir(&self) -> PathBuf {
|
||||
self.base_data_dir.join("storage_broker")
|
||||
}
|
||||
|
||||
pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf {
|
||||
self.base_data_dir
|
||||
.join(format!("pageserver_{pageserver_id}"))
|
||||
@@ -503,6 +514,23 @@ impl LocalEnv {
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates HTTP client with local SSL CA certificates.
|
||||
pub fn create_http_client(&self) -> reqwest::Client {
|
||||
let ssl_ca_certs = self.ssl_ca_cert_path().map(|ssl_ca_file| {
|
||||
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
|
||||
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
|
||||
});
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
|
||||
http_client
|
||||
.build()
|
||||
.expect("HTTP client should construct with no error")
|
||||
}
|
||||
|
||||
/// Inspect the base data directory and extract the instance id and instance directory path
|
||||
/// for all storage controller instances
|
||||
pub async fn storage_controller_instances(&self) -> std::io::Result<Vec<(u8, PathBuf)>> {
|
||||
@@ -911,6 +939,12 @@ impl LocalEnv {
|
||||
// create endpoints dir
|
||||
fs::create_dir_all(env.endpoints_path())?;
|
||||
|
||||
// create storage broker dir
|
||||
fs::create_dir_all(env.storage_broker_data_dir())?;
|
||||
StorageBroker::from_env(&env)
|
||||
.initialize()
|
||||
.context("storage broker init failed")?;
|
||||
|
||||
// create safekeeper dirs
|
||||
for safekeeper in &env.safekeepers {
|
||||
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(&env, safekeeper.id))?;
|
||||
|
||||
@@ -21,7 +21,6 @@ use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use postgres_backend::AuthType;
|
||||
use postgres_connection::{PgConnectionConfig, parse_host_port};
|
||||
use reqwest::Certificate;
|
||||
use utils::auth::{Claims, Scope};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -51,19 +50,6 @@ impl PageServerNode {
|
||||
parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
|
||||
let port = port.unwrap_or(5432);
|
||||
|
||||
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
|
||||
let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist");
|
||||
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
|
||||
});
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
let http_client = http_client
|
||||
.build()
|
||||
.expect("Client constructs with no errors");
|
||||
|
||||
let endpoint = if env.storage_controller.use_https_pageserver_api {
|
||||
format!(
|
||||
"https://{}",
|
||||
@@ -80,7 +66,7 @@ impl PageServerNode {
|
||||
conf: conf.clone(),
|
||||
env: env.clone(),
|
||||
http_client: mgmt_api::Client::new(
|
||||
http_client,
|
||||
env.create_http_client(),
|
||||
endpoint,
|
||||
{
|
||||
match conf.http_auth_type {
|
||||
|
||||
@@ -87,7 +87,7 @@ impl SafekeeperNode {
|
||||
conf: conf.clone(),
|
||||
pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port),
|
||||
env: env.clone(),
|
||||
http_client: reqwest::Client::new(),
|
||||
http_client: env.create_http_client(),
|
||||
http_base_url: format!("http://{}:{}/v1", listen_addr, conf.http_port),
|
||||
listen_addr,
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
|
||||
use pem::Pem;
|
||||
use postgres_backend::AuthType;
|
||||
use reqwest::{Certificate, Method};
|
||||
use reqwest::Method;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::process::Command;
|
||||
@@ -153,24 +153,11 @@ impl StorageController {
|
||||
}
|
||||
};
|
||||
|
||||
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
|
||||
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
|
||||
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
|
||||
});
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
let http_client = http_client
|
||||
.build()
|
||||
.expect("HTTP client should construct with no error");
|
||||
|
||||
Self {
|
||||
env: env.clone(),
|
||||
private_key,
|
||||
public_key,
|
||||
client: http_client,
|
||||
client: env.create_http_client(),
|
||||
config: env.storage_controller.clone(),
|
||||
listen_port: OnceLock::default(),
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ For design details see [the RFC](./rfcs/021-metering.md) and [the discussion on
|
||||
batch format is
|
||||
```json
|
||||
|
||||
{ "events" : [metric1, metric2, ...]]}
|
||||
{ "events" : [metric1, metric2, ...] }
|
||||
|
||||
```
|
||||
See metric format examples below.
|
||||
@@ -49,11 +49,13 @@ Size of the remote storage (S3) directory.
|
||||
This is an absolute, per-tenant metric.
|
||||
|
||||
- `timeline_logical_size`
|
||||
Logical size of the data in the timeline
|
||||
|
||||
Logical size of the data in the timeline.
|
||||
This is an absolute, per-timeline metric.
|
||||
|
||||
- `synthetic_storage_size`
|
||||
Size of all tenant's branches including WAL
|
||||
|
||||
Size of all tenant's branches including WAL.
|
||||
This is the same metric that `tenant/{tenant_id}/size` endpoint returns.
|
||||
This is an absolute, per-tenant metric.
|
||||
|
||||
@@ -106,10 +108,10 @@ This is an incremental, per-endpoint metric.
|
||||
```
|
||||
|
||||
The metric is incremental, so the value is the difference between the current and the previous value.
|
||||
If there is no previous value, the value, the value is the current value and the `start_time` equals `stop_time`.
|
||||
If there is no previous value, the value is the current value and the `start_time` equals `stop_time`.
|
||||
|
||||
### TODO
|
||||
|
||||
- [ ] Handle errors better: currently if one tenant fails to gather metrics, the whole iteration fails and metrics are not sent for any tenant.
|
||||
- [ ] Add retries
|
||||
- [ ] Tune the interval
|
||||
- [ ] Tune the interval
|
||||
|
||||
@@ -169,6 +169,8 @@ pub struct TenantDescribeResponseShard {
|
||||
pub is_pending_compute_notification: bool,
|
||||
/// A shard split is currently underway
|
||||
pub is_splitting: bool,
|
||||
/// A timeline is being imported into this tenant
|
||||
pub is_importing: bool,
|
||||
|
||||
pub scheduling_policy: ShardSchedulingPolicy,
|
||||
|
||||
|
||||
@@ -3816,6 +3816,24 @@ impl TenantShard {
|
||||
MaybeDeletedIndexPart::IndexPart(p) => p,
|
||||
};
|
||||
|
||||
// A shard split may not take place while a timeline import is on-going
|
||||
// for the tenant. Timeline imports run as part of each tenant shard
|
||||
// and rely on the sharding scheme to split the work among pageservers.
|
||||
// If we were to split in the middle of this process, we would have to
|
||||
// either ensure that it's driven to completion on the old shard set
|
||||
// or transfer it to the new shard set. It's technically possible, but complex.
|
||||
match index_part.import_pgdata {
|
||||
Some(ref import) if !import.is_done() => {
|
||||
anyhow::bail!(
|
||||
"Cannot split due to import with idempotency key: {:?}",
|
||||
import.idempotency_key()
|
||||
);
|
||||
}
|
||||
Some(_) | None => {
|
||||
// fallthrough
|
||||
}
|
||||
}
|
||||
|
||||
for child_shard in child_shards {
|
||||
tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index());
|
||||
upload_index_part(
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::proxy::retry::CouldRetry;
|
||||
|
||||
/// A go-to error message which doesn't leak any detail.
|
||||
pub(crate) const REQUEST_FAILED: &str = "Console request failed";
|
||||
pub(crate) const REQUEST_FAILED: &str = "Control plane request failed";
|
||||
|
||||
/// Common console API error.
|
||||
#[derive(Debug, Error)]
|
||||
|
||||
@@ -11,6 +11,7 @@ bench = []
|
||||
anyhow.workspace = true
|
||||
async-stream.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
const_format.workspace = true
|
||||
futures.workspace = true
|
||||
@@ -19,12 +20,14 @@ futures-util.workspace = true
|
||||
humantime.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
http-body-util.workspace = true
|
||||
http-utils.workspace = true
|
||||
hyper-util = "0.1"
|
||||
once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
prost.workspace = true
|
||||
tonic.workspace = true
|
||||
tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
tokio-rustls.workspace = true
|
||||
tracing.workspace = true
|
||||
metrics.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
@@ -17,10 +17,13 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use clap::{Parser, command};
|
||||
use futures::future::OptionFuture;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use http_body_util::Full;
|
||||
use http_utils::tls_certs::ReloadingCertificateResolver;
|
||||
use hyper::body::Incoming;
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use hyper::service::service_fn;
|
||||
@@ -38,7 +41,7 @@ use storage_broker::proto::{
|
||||
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
|
||||
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
|
||||
};
|
||||
use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR, parse_proto_ttid};
|
||||
use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, parse_proto_ttid};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
@@ -59,12 +62,25 @@ project_build_tag!(BUILD_TAG);
|
||||
const DEFAULT_CHAN_SIZE: usize = 32;
|
||||
const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
|
||||
|
||||
const DEFAULT_SSL_KEY_FILE: &str = "server.key";
|
||||
const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
|
||||
const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s";
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
|
||||
#[clap(group(
|
||||
clap::ArgGroup::new("listen-addresses")
|
||||
.required(true)
|
||||
.multiple(true)
|
||||
.args(&["listen_addr", "listen_https_addr"]),
|
||||
))]
|
||||
struct Args {
|
||||
/// Endpoint to listen on.
|
||||
#[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
|
||||
listen_addr: SocketAddr,
|
||||
/// Endpoint to listen HTTP on.
|
||||
#[arg(short, long)]
|
||||
listen_addr: Option<SocketAddr>,
|
||||
/// Endpoint to listen HTTPS on.
|
||||
#[arg(long)]
|
||||
listen_https_addr: Option<SocketAddr>,
|
||||
/// Size of the queue to the per timeline subscriber.
|
||||
#[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
|
||||
timeline_chan_size: usize,
|
||||
@@ -72,11 +88,20 @@ struct Args {
|
||||
#[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
|
||||
all_keys_chan_size: usize,
|
||||
/// HTTP/2 keepalive interval.
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
|
||||
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
|
||||
http2_keepalive_interval: Duration,
|
||||
/// Format for logging, either 'plain' or 'json'.
|
||||
#[arg(long, default_value = "plain")]
|
||||
log_format: String,
|
||||
/// Path to a file with certificate's private key for https API.
|
||||
#[arg(long, default_value = DEFAULT_SSL_KEY_FILE)]
|
||||
ssl_key_file: Utf8PathBuf,
|
||||
/// Path to a file with a X509 certificate for https API.
|
||||
#[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
|
||||
ssl_cert_file: Utf8PathBuf,
|
||||
/// Period to reload certificate and private key from files.
|
||||
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
|
||||
ssl_cert_reload_period: Duration,
|
||||
}
|
||||
|
||||
/// Id of publisher for registering in maps
|
||||
@@ -674,12 +699,50 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
};
|
||||
let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
|
||||
|
||||
let http_listener = match &args.listen_addr {
|
||||
Some(addr) => {
|
||||
info!("listening HTTP on {}", addr);
|
||||
Some(TcpListener::bind(addr).await?)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let (https_listener, tls_acceptor) = match &args.listen_https_addr {
|
||||
Some(addr) => {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
let cert_resolver = ReloadingCertificateResolver::new(
|
||||
"main",
|
||||
&args.ssl_key_file,
|
||||
&args.ssl_cert_file,
|
||||
args.ssl_cert_reload_period,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut tls_config = rustls::ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(cert_resolver);
|
||||
|
||||
// Tonic is HTTP/2 only and it negotiates it with ALPN.
|
||||
tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
|
||||
let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
|
||||
|
||||
info!("listening HTTPS on {}", addr);
|
||||
(Some(listener), Some(acceptor))
|
||||
}
|
||||
None => (None, None),
|
||||
};
|
||||
|
||||
// grpc is served along with http1 for metrics on a single port, hence we
|
||||
// don't use tonic's Server.
|
||||
let tcp_listener = TcpListener::bind(&args.listen_addr).await?;
|
||||
info!("listening on {}", &args.listen_addr);
|
||||
loop {
|
||||
let (stream, addr) = match tcp_listener.accept().await {
|
||||
let (conn, is_https) = tokio::select! {
|
||||
Some(conn) = OptionFuture::from(http_listener.as_ref().map(|l| l.accept())) => (conn, false),
|
||||
Some(conn) = OptionFuture::from(https_listener.as_ref().map(|l| l.accept())) => (conn, true),
|
||||
};
|
||||
|
||||
let (tcp_stream, addr) = match conn {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
info!("couldn't accept connection: {e}");
|
||||
@@ -734,13 +797,32 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
.await;
|
||||
|
||||
let tls_acceptor = tls_acceptor.clone();
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let res = builder
|
||||
.serve_connection(TokioIo::new(stream), service_fn_)
|
||||
.await;
|
||||
let res = if is_https {
|
||||
let tls_acceptor =
|
||||
tls_acceptor.expect("tls_acceptor is set together with https_listener");
|
||||
|
||||
let tls_stream = match tls_acceptor.accept(tcp_stream).await {
|
||||
Ok(tls_stream) => tls_stream,
|
||||
Err(e) => {
|
||||
info!("error accepting TLS connection from {addr}: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
builder
|
||||
.serve_connection(TokioIo::new(tls_stream), service_fn_)
|
||||
.await
|
||||
} else {
|
||||
builder
|
||||
.serve_connection(TokioIo::new(tcp_stream), service_fn_)
|
||||
.await
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
info!("error serving connection from {addr}: {e}");
|
||||
info!(%is_https, "error serving connection from {addr}: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -196,7 +196,7 @@ struct Cli {
|
||||
ssl_cert_reload_period: humantime::Duration,
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<PathBuf>,
|
||||
ssl_ca_file: Option<Utf8PathBuf>,
|
||||
|
||||
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
|
||||
/// the compute notification directly (instead of via control plane).
|
||||
|
||||
@@ -134,6 +134,7 @@ pub(crate) enum DatabaseOperation {
|
||||
UpdateTimelineImport,
|
||||
DeleteTimelineImport,
|
||||
ListTimelineImports,
|
||||
IsTenantImportingTimeline,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -1641,9 +1642,7 @@ impl Persistence {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn list_complete_timeline_imports(
|
||||
&self,
|
||||
) -> DatabaseResult<Vec<TimelineImport>> {
|
||||
pub(crate) async fn list_timeline_imports(&self) -> DatabaseResult<Vec<TimelineImport>> {
|
||||
use crate::schema::timeline_imports::dsl;
|
||||
let persistent = self
|
||||
.with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| {
|
||||
@@ -1660,10 +1659,7 @@ impl Persistence {
|
||||
.map(TimelineImport::from_persistent)
|
||||
.collect();
|
||||
match imports {
|
||||
Ok(ok) => Ok(ok
|
||||
.into_iter()
|
||||
.filter(|import| import.is_complete())
|
||||
.collect()),
|
||||
Ok(ok) => Ok(ok.into_iter().collect()),
|
||||
Err(err) => Err(DatabaseError::Logical(format!(
|
||||
"failed to deserialize import: {err}"
|
||||
))),
|
||||
@@ -1773,6 +1769,25 @@ impl Persistence {
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn is_tenant_importing_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> DatabaseResult<bool> {
|
||||
use crate::schema::timeline_imports::dsl;
|
||||
self.with_measured_conn(DatabaseOperation::IsTenantImportingTimeline, move |conn| {
|
||||
Box::pin(async move {
|
||||
let imports: i64 = dsl::timeline_imports
|
||||
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
|
||||
.count()
|
||||
.get_result(conn)
|
||||
.await?;
|
||||
|
||||
Ok(imports > 0)
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
|
||||
|
||||
@@ -97,7 +97,9 @@ use crate::tenant_shard::{
|
||||
ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter,
|
||||
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
|
||||
};
|
||||
use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient};
|
||||
use crate::timeline_import::{
|
||||
ShardImportStatuses, TimelineImport, TimelineImportState, UpcallClient,
|
||||
};
|
||||
|
||||
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
|
||||
@@ -878,15 +880,33 @@ impl Service {
|
||||
});
|
||||
}
|
||||
|
||||
// Fetch the list of completed imports and attempt to finalize them in the background.
|
||||
// This handles the case where the previous storage controller instance shut down
|
||||
// whilst finalizing imports.
|
||||
let complete_imports = self.persistence.list_complete_timeline_imports().await;
|
||||
match complete_imports {
|
||||
Ok(ok) => {
|
||||
// Reconcile the timeline imports:
|
||||
// 1. Mark each tenant shard of tenants with an importing timeline as importing.
|
||||
// 2. Finalize the completed imports in the background. This handles the case where
|
||||
// the previous storage controller instance shut down whilst finalizing imports.
|
||||
let imports = self.persistence.list_timeline_imports().await;
|
||||
match imports {
|
||||
Ok(mut imports) => {
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for import in &imports {
|
||||
locked
|
||||
.tenants
|
||||
.range_mut(TenantShardId::tenant_range(import.tenant_id))
|
||||
.for_each(|(_id, shard)| {
|
||||
shard.importing = TimelineImportState::Importing
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
imports.retain(|import| import.is_complete());
|
||||
tokio::task::spawn({
|
||||
let finalize_imports_self = self.clone();
|
||||
async move { finalize_imports_self.finalize_timeline_imports(ok).await }
|
||||
async move {
|
||||
finalize_imports_self
|
||||
.finalize_timeline_imports(imports)
|
||||
.await
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -3772,6 +3792,22 @@ impl Service {
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
let is_import = create_req.is_import();
|
||||
|
||||
if is_import {
|
||||
// Ensure that there is no split on-going.
|
||||
// [`Self::tenant_shard_split`] holds the exclusive tenant lock
|
||||
// for the duration of the split, but here we handle the case
|
||||
// where we restarted and the split is being aborted.
|
||||
let locked = self.inner.read().unwrap();
|
||||
let splitting = locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.any(|(_id, shard)| shard.splitting != SplitState::Idle);
|
||||
|
||||
if splitting {
|
||||
return Err(ApiError::Conflict("Tenant is splitting shard".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let timeline_info = self
|
||||
.tenant_timeline_create_pageservers(tenant_id, create_req)
|
||||
.await?;
|
||||
@@ -3809,6 +3845,14 @@ impl Service {
|
||||
.context("timeline import insert")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
// Set the importing flag on the tenant shards
|
||||
self.inner
|
||||
.write()
|
||||
.unwrap()
|
||||
.tenants
|
||||
.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Importing);
|
||||
|
||||
match inserted {
|
||||
true => {
|
||||
tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import");
|
||||
@@ -3931,6 +3975,13 @@ impl Service {
|
||||
tracing::warn!("Failed to delete timeline import entry from database: {err}");
|
||||
}
|
||||
|
||||
self.inner
|
||||
.write()
|
||||
.unwrap()
|
||||
.tenants
|
||||
.range_mut(TenantShardId::tenant_range(import.tenant_id))
|
||||
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
|
||||
|
||||
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
|
||||
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
@@ -4914,6 +4965,7 @@ impl Service {
|
||||
is_reconciling: shard.reconciler.is_some(),
|
||||
is_pending_compute_notification: shard.pending_compute_notification,
|
||||
is_splitting: matches!(shard.splitting, SplitState::Splitting),
|
||||
is_importing: shard.importing == TimelineImportState::Importing,
|
||||
scheduling_policy: shard.get_scheduling_policy(),
|
||||
preferred_az_id: shard.preferred_az().map(ToString::to_string),
|
||||
})
|
||||
@@ -5404,6 +5456,27 @@ impl Service {
|
||||
.enter()
|
||||
.map_err(|_| ApiError::ShuttingDown)?;
|
||||
|
||||
// Timeline imports on the pageserver side can't handle shard-splits.
|
||||
// If the tenant is importing a timeline, dont't shard split it.
|
||||
match self
|
||||
.persistence
|
||||
.is_tenant_importing_timeline(tenant_id)
|
||||
.await
|
||||
{
|
||||
Ok(importing) => {
|
||||
if importing {
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot shard split during timeline import".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Failed to check for running imports: {err}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
let new_shard_count = ShardCount::new(split_req.new_shard_count);
|
||||
let new_stripe_size = split_req.new_stripe_size;
|
||||
|
||||
@@ -8076,12 +8149,25 @@ impl Service {
|
||||
candidates.extend(size_candidates);
|
||||
}
|
||||
|
||||
// Filter out tenants in a prohibiting scheduling mode.
|
||||
// Filter out tenants in a prohibiting scheduling modes
|
||||
// and tenants with an ongoing import.
|
||||
//
|
||||
// Note that the import check here is oportunistic. An import might start
|
||||
// after the check before we actually update [`TenantShard::splitting`].
|
||||
// [`Self::tenant_shard_split`] checks the database whilst holding the exclusive
|
||||
// tenant lock. Imports might take a long time, so the check here allows us
|
||||
// to split something else instead of trying the same shard over and over.
|
||||
{
|
||||
let state = self.inner.read().unwrap();
|
||||
candidates.retain(|i| {
|
||||
let policy = state.tenants.get(&i.id).map(|s| s.get_scheduling_policy());
|
||||
policy == Some(ShardSchedulingPolicy::Active)
|
||||
let shard = state.tenants.get(&i.id);
|
||||
match shard {
|
||||
Some(t) => {
|
||||
t.get_scheduling_policy() == ShardSchedulingPolicy::Active
|
||||
&& t.importing == TimelineImportState::Idle
|
||||
}
|
||||
None => false,
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::scheduler::{
|
||||
RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
|
||||
};
|
||||
use crate::service::ReconcileResultRequest;
|
||||
use crate::timeline_import::TimelineImportState;
|
||||
use crate::{Sequence, service};
|
||||
|
||||
/// Serialization helper
|
||||
@@ -100,6 +101,10 @@ pub(crate) struct TenantShard {
|
||||
/// reconciliation, and timeline creation.
|
||||
pub(crate) splitting: SplitState,
|
||||
|
||||
/// Flag indicating whether the tenant has an in-progress timeline import.
|
||||
/// Used to disallow shard splits while an import is in progress.
|
||||
pub(crate) importing: TimelineImportState,
|
||||
|
||||
/// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
|
||||
/// is set. This flag is cleared when the tenant is popped off the delay queue.
|
||||
pub(crate) delayed_reconcile: bool,
|
||||
@@ -583,6 +588,7 @@ impl TenantShard {
|
||||
config: TenantConfig::default(),
|
||||
reconciler: None,
|
||||
splitting: SplitState::Idle,
|
||||
importing: TimelineImportState::Idle,
|
||||
sequence: Sequence(1),
|
||||
delayed_reconcile: false,
|
||||
waiter: Arc::new(SeqWait::new(Sequence(0))),
|
||||
@@ -1844,6 +1850,8 @@ impl TenantShard {
|
||||
config: serde_json::from_str(&tsp.config).unwrap(),
|
||||
reconciler: None,
|
||||
splitting: tsp.splitting,
|
||||
// Filled in during [`Service::startup_reconcile`]
|
||||
importing: TimelineImportState::Idle,
|
||||
waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
||||
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
||||
last_error: Arc::default(),
|
||||
|
||||
@@ -14,6 +14,12 @@ use utils::{
|
||||
|
||||
use crate::{persistence::TimelineImportPersistence, service::Config};
|
||||
|
||||
#[derive(Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub(crate) enum TimelineImportState {
|
||||
Importing,
|
||||
Idle,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
|
||||
|
||||
|
||||
@@ -501,6 +501,9 @@ class NeonEnvBuilder:
|
||||
# Flag to use https listener in storage controller, generate local ssl certs,
|
||||
# and force pageservers and neon_local to use https for storage controller api.
|
||||
self.use_https_storage_controller_api: bool = False
|
||||
# Flag to use https listener in storage broker, generate local ssl certs,
|
||||
# and force pageservers and safekeepers to use https for storage broker api.
|
||||
self.use_https_storage_broker_api: bool = False
|
||||
|
||||
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
|
||||
self.pageserver_get_vectored_concurrent_io: str | None = (
|
||||
@@ -1086,7 +1089,7 @@ class NeonEnv:
|
||||
self.safekeepers: list[Safekeeper] = []
|
||||
self.pageservers: list[NeonPageserver] = []
|
||||
self.num_azs = config.num_azs
|
||||
self.broker = NeonBroker(self)
|
||||
self.broker = NeonBroker(self, config.use_https_storage_broker_api)
|
||||
self.pageserver_remote_storage = config.pageserver_remote_storage
|
||||
self.safekeepers_remote_storage = config.safekeepers_remote_storage
|
||||
self.pg_version = config.pg_version
|
||||
@@ -1106,6 +1109,7 @@ class NeonEnv:
|
||||
config.use_https_pageserver_api
|
||||
or config.use_https_safekeeper_api
|
||||
or config.use_https_storage_controller_api
|
||||
or config.use_https_storage_broker_api
|
||||
)
|
||||
self.ssl_ca_file = (
|
||||
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None
|
||||
@@ -1178,15 +1182,18 @@ class NeonEnv:
|
||||
# Create the neon_local's `NeonLocalInitConf`
|
||||
cfg: dict[str, Any] = {
|
||||
"default_tenant_id": str(self.initial_tenant),
|
||||
"broker": {
|
||||
"listen_addr": self.broker.listen_addr(),
|
||||
},
|
||||
"broker": {},
|
||||
"safekeepers": [],
|
||||
"pageservers": [],
|
||||
"endpoint_storage": {"port": self.port_distributor.get_port()},
|
||||
"generate_local_ssl_certs": self.generate_local_ssl_certs,
|
||||
}
|
||||
|
||||
if config.use_https_storage_broker_api:
|
||||
cfg["broker"]["listen_https_addr"] = self.broker.listen_addr()
|
||||
else:
|
||||
cfg["broker"]["listen_addr"] = self.broker.listen_addr()
|
||||
|
||||
if self.control_plane_api is not None:
|
||||
cfg["control_plane_api"] = self.control_plane_api
|
||||
|
||||
@@ -4933,9 +4940,10 @@ class Safekeeper(LogUtils):
|
||||
class NeonBroker(LogUtils):
|
||||
"""An object managing storage_broker instance"""
|
||||
|
||||
def __init__(self, env: NeonEnv):
|
||||
super().__init__(logfile=env.repo_dir / "storage_broker.log")
|
||||
def __init__(self, env: NeonEnv, use_https: bool):
|
||||
super().__init__(logfile=env.repo_dir / "storage_broker" / "storage_broker.log")
|
||||
self.env = env
|
||||
self.scheme = "https" if use_https else "http"
|
||||
self.port: int = self.env.port_distributor.get_port()
|
||||
self.running = False
|
||||
|
||||
@@ -4958,7 +4966,7 @@ class NeonBroker(LogUtils):
|
||||
return f"127.0.0.1:{self.port}"
|
||||
|
||||
def client_url(self):
|
||||
return f"http://{self.listen_addr()}"
|
||||
return f"{self.scheme}://{self.listen_addr()}"
|
||||
|
||||
def assert_no_errors(self):
|
||||
assert_no_errors(self.logfile, "storage_controller", [])
|
||||
|
||||
@@ -323,6 +323,7 @@ class NeonProject:
|
||||
if self.restart_pgbench_on_console_errors and (
|
||||
"ERROR: Couldn't connect to compute node" in err
|
||||
or "ERROR: Console request failed" in err
|
||||
or "ERROR: Control plane request failed" in err
|
||||
):
|
||||
log.info("Restarting benchmark for %s", target)
|
||||
self.benchmarks.pop(target)
|
||||
|
||||
@@ -6,6 +6,7 @@ import pytest
|
||||
import requests
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, StorageControllerApiException
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
|
||||
def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -212,3 +213,24 @@ def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder):
|
||||
assert reload_error_cnt > 0
|
||||
|
||||
wait_until(reload_failed)
|
||||
|
||||
|
||||
def test_storage_broker_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test HTTPS storage broker API.
|
||||
1. Make /status request to HTTPS API to ensure it's appropriately configured.
|
||||
2. Generate simple workload to ensure that SK -> broker -> PS communication works well.
|
||||
"""
|
||||
neon_env_builder.use_https_storage_broker_api = True
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# 1. Simple check that HTTPS is enabled and works.
|
||||
url = env.broker.client_url() + "/status"
|
||||
assert url.startswith("https://")
|
||||
requests.get(url, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
|
||||
# 2. Simple workload to check that SK -> broker -> PS communication works over HTTPS.
|
||||
workload = Workload(env, env.initial_tenant, env.initial_timeline)
|
||||
workload.init()
|
||||
workload.write_rows(10)
|
||||
workload.validate()
|
||||
|
||||
Reference in New Issue
Block a user