diff --git a/.github/actions/neon-project-create/action.yml b/.github/actions/neon-project-create/action.yml index aa58876aa5..a5b4104908 100644 --- a/.github/actions/neon-project-create/action.yml +++ b/.github/actions/neon-project-create/action.yml @@ -140,20 +140,14 @@ runs: -d "{\"scheduling\": \"Essential\"}" fi # XXX - # This is a workaround for project's settings which don't work well in public API now - # https://github.com/neondatabase/cloud/issues/27143 + # This is a workaround for the default endpoint settings, which currently do not allow some settings in the public API. # https://github.com/neondatabase/cloud/issues/27108 - if ( [[ -n "${PROJECT_SETTINGS}" ]] && [[ "${PROJECT_SETTINGS}" != "{}" ]] ) || ( [[ -n "${DEFAULT_ENDPOINT_SETTINGS}" ]] && [[ "${DEFAULT_ENDPOINT_SETTINGS}" != "{}" ]] ); then + if [[ -n ${DEFAULT_ENDPOINT_SETTINGS} && ${DEFAULT_ENDPOINT_SETTINGS} != "{}" ]] ; then PROJECT_DATA=$(curl -X GET \ "https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/projects/${project_id}" \ -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \ -d "{\"scheduling\": \"Essential\"}" ) - NEW_PROJECT_SETTINGS=$(echo ${PROJECT_DATA} | jq -rc ".project.settings + ${PROJECT_SETTINGS}") - curl -X POST --fail \ - "https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/projects/${project_id}/settings" \ - -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \ - --data "${NEW_PROJECT_SETTINGS}" NEW_DEFAULT_ENDPOINT_SETTINGS=$(echo ${PROJECT_DATA} | jq -rc ".project.default_endpoint_settings + ${DEFAULT_ENDPOINT_SETTINGS}") curl -X POST --fail \ "https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/projects/${project_id}/default_endpoint_settings" \ diff --git a/.github/workflows/_build-and-test-locally.yml b/.github/workflows/_build-and-test-locally.yml index e31d3dec5b..4f7d6026f2 100644 --- a/.github/workflows/_build-and-test-locally.yml +++ b/.github/workflows/_build-and-test-locally.yml @@ -28,6 +28,16 @@ on: required: false default: 'disabled' type: string + test-selection: + description: 'specification of selected test(s) to run' + required: false + default: '' + type: string + test-run-count: + description: 'number of runs to perform for selected tests' + required: false + default: 1 + type: number defaults: run: @@ -381,14 +391,15 @@ jobs: run_with_real_s3: true real_s3_bucket: neon-github-ci-tests real_s3_region: eu-central-1 - rerun_failed: true + rerun_failed: ${{ inputs.test-run-count == 1 }} pg_version: ${{ matrix.pg_version }} sanitizers: ${{ inputs.sanitizers }} aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} # `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds. # Attempt to stop tests gracefully to generate test reports # until they are forcibly stopped by the stricter `timeout-minutes` limit. - extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} + extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} --count=${{ inputs.test-run-count }} + ${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }} env: TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }} CHECK_ONDISK_DATA_COMPATIBILITY: nonempty diff --git a/.github/workflows/build_and_run_selected_test.yml b/.github/workflows/build_and_run_selected_test.yml new file mode 100644 index 0000000000..f22fe310ab --- /dev/null +++ b/.github/workflows/build_and_run_selected_test.yml @@ -0,0 +1,120 @@ +name: Build and Run Selected Test + +on: + workflow_dispatch: + inputs: + test-selection: + description: 'Specification of selected test(s), as accepted by pytest -k' + required: true + type: string + run-count: + description: 'Number of test runs to perform' + required: true + type: number + archs: + description: 'Archs to run tests on, e. g.: ["x64", "arm64"]' + default: '["x64"]' + required: true + type: string + build-types: + description: 'Build types to run tests on, e. g.: ["debug", "release"]' + default: '["release"]' + required: true + type: string + pg-versions: + description: 'Postgres versions to use for testing, e.g,: [{"pg_version":"v16"}, {"pg_version":"v17"}])' + default: '[{"pg_version":"v17"}]' + required: true + type: string + +defaults: + run: + shell: bash -euxo pipefail {0} + +env: + RUST_BACKTRACE: 1 + COPT: '-Werror' + +jobs: + meta: + uses: ./.github/workflows/_meta.yml + with: + github-event-name: ${{ github.event_name }} + github-event-json: ${{ toJSON(github.event) }} + + build-and-test-locally: + needs: [ meta ] + strategy: + fail-fast: false + matrix: + arch: ${{ fromJson(inputs.archs) }} + build-type: ${{ fromJson(inputs.build-types) }} + uses: ./.github/workflows/_build-and-test-locally.yml + with: + arch: ${{ matrix.arch }} + build-tools-image: ghcr.io/neondatabase/build-tools:pinned-bookworm + build-tag: ${{ needs.meta.outputs.build-tag }} + build-type: ${{ matrix.build-type }} + test-cfg: ${{ inputs.pg-versions }} + test-selection: ${{ inputs.test-selection }} + test-run-count: ${{ fromJson(inputs.run-count) }} + secrets: inherit + + create-test-report: + needs: [ build-and-test-locally ] + if: ${{ !cancelled() }} + permissions: + id-token: write # aws-actions/configure-aws-credentials + statuses: write + contents: write + pull-requests: write + outputs: + report-url: ${{ steps.create-allure-report.outputs.report-url }} + + runs-on: [ self-hosted, small ] + container: + image: ghcr.io/neondatabase/build-tools:pinned-bookworm + credentials: + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + options: --init + + steps: + - name: Harden the runner (Audit all outbound calls) + uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 + with: + egress-policy: audit + + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + + - name: Create Allure report + if: ${{ !cancelled() }} + id: create-allure-report + uses: ./.github/actions/allure-report-generate + with: + store-test-results-into-db: true + aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + env: + REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_DEV }} + + - uses: actions/github-script@v7 + if: ${{ !cancelled() }} + with: + # Retry script for 5XX server errors: https://github.com/actions/github-script#retries + retries: 5 + script: | + const report = { + reportUrl: "${{ steps.create-allure-report.outputs.report-url }}", + reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}", + } + + const coverage = {} + + const script = require("./scripts/comment-test-report.js") + await script({ + github, + context, + fetch, + report, + coverage, + }) diff --git a/.github/workflows/check-permissions.yml b/.github/workflows/check-permissions.yml index 407f612887..a61a37ea4c 100644 --- a/.github/workflows/check-permissions.yml +++ b/.github/workflows/check-permissions.yml @@ -19,7 +19,7 @@ jobs: runs-on: ubuntu-22.04 steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit diff --git a/.github/workflows/cleanup-caches-by-a-branch.yml b/.github/workflows/cleanup-caches-by-a-branch.yml index 3608d8b074..abac0d95e4 100644 --- a/.github/workflows/cleanup-caches-by-a-branch.yml +++ b/.github/workflows/cleanup-caches-by-a-branch.yml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-22.04 steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit diff --git a/.github/workflows/fast-forward.yml b/.github/workflows/fast-forward.yml index f80596a7a6..22dacc429f 100644 --- a/.github/workflows/fast-forward.yml +++ b/.github/workflows/fast-forward.yml @@ -14,7 +14,7 @@ jobs: steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit diff --git a/.github/workflows/label-for-external-users.yml b/.github/workflows/label-for-external-users.yml index 02d128179d..f9daa19ad9 100644 --- a/.github/workflows/label-for-external-users.yml +++ b/.github/workflows/label-for-external-users.yml @@ -28,7 +28,7 @@ jobs: steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit @@ -75,7 +75,7 @@ jobs: steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit diff --git a/.github/workflows/pin-build-tools-image.yml b/.github/workflows/pin-build-tools-image.yml index f8d8172cb0..82bbc722a7 100644 --- a/.github/workflows/pin-build-tools-image.yml +++ b/.github/workflows/pin-build-tools-image.yml @@ -41,7 +41,7 @@ jobs: steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit diff --git a/.github/workflows/trigger-e2e-tests.yml b/.github/workflows/trigger-e2e-tests.yml index ca4c465931..a951b4b258 100644 --- a/.github/workflows/trigger-e2e-tests.yml +++ b/.github/workflows/trigger-e2e-tests.yml @@ -35,7 +35,7 @@ jobs: steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit @@ -73,7 +73,7 @@ jobs: }} steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit diff --git a/Cargo.lock b/Cargo.lock index 2cf260c88c..4c464c62b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6616,12 +6616,14 @@ dependencies = [ "anyhow", "async-stream", "bytes", + "camino", "clap", "const_format", "futures", "futures-core", "futures-util", "http-body-util", + "http-utils", "humantime", "hyper 1.4.1", "hyper-util", @@ -6631,6 +6633,7 @@ dependencies = [ "prost 0.13.3", "rustls 0.23.18", "tokio", + "tokio-rustls 0.26.0", "tonic", "tonic-build", "tracing", diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 5cf6767361..6f55c0310f 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -17,8 +17,10 @@ use std::time::Duration; use anyhow::{Context, Result, anyhow, bail}; use clap::Parser; use compute_api::spec::ComputeMode; +use control_plane::broker::StorageBroker; use control_plane::endpoint::ComputeControlPlane; use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage}; +use control_plane::local_env; use control_plane::local_env::{ EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf, SafekeeperConf, @@ -28,7 +30,6 @@ use control_plane::safekeeper::SafekeeperNode; use control_plane::storage_controller::{ NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController, }; -use control_plane::{broker, local_env}; use nix::fcntl::{FlockArg, flock}; use pageserver_api::config::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, @@ -988,7 +989,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result { NeonLocalInitConf { control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()), broker: NeonBroker { - listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(), + listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()), + listen_https_addr: None, }, safekeepers: vec![SafekeeperConf { id: DEFAULT_SAFEKEEPER_ID, @@ -1777,7 +1779,8 @@ async fn handle_endpoint_storage( async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> { match subcmd { StorageBrokerCmd::Start(args) => { - if let Err(e) = broker::start_broker_process(env, &args.start_timeout).await { + let storage_broker = StorageBroker::from_env(env); + if let Err(e) = storage_broker.start(&args.start_timeout).await { eprintln!("broker start failed: {e}"); exit(1); } @@ -1785,7 +1788,8 @@ async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::Local StorageBrokerCmd::Stop(_args) => { // FIXME: stop_mode unused - if let Err(e) = broker::stop_broker_process(env) { + let storage_broker = StorageBroker::from_env(env); + if let Err(e) = storage_broker.stop() { eprintln!("broker stop failed: {e}"); exit(1); } @@ -1835,8 +1839,11 @@ async fn handle_start_all_impl( #[allow(clippy::redundant_closure_call)] (|| { js.spawn(async move { - let retry_timeout = retry_timeout; - broker::start_broker_process(env, &retry_timeout).await + let storage_broker = StorageBroker::from_env(env); + storage_broker + .start(&retry_timeout) + .await + .map_err(|e| e.context("start storage_broker")) }); js.spawn(async move { @@ -1991,7 +1998,8 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { } } - if let Err(e) = broker::stop_broker_process(env) { + let storage_broker = StorageBroker::from_env(env); + if let Err(e) = storage_broker.stop() { eprintln!("neon broker stop failed: {e:#}"); } diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index 1b507bb384..f43f459636 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -3,60 +3,86 @@ //! In the local test environment, the storage broker stores its data directly in //! //! ```text -//! .neon +//! .neon/storage_broker //! ``` use std::time::Duration; use anyhow::Context; use camino::Utf8PathBuf; -use crate::{background_process, local_env}; +use crate::{background_process, local_env::LocalEnv}; -pub async fn start_broker_process( - env: &local_env::LocalEnv, - retry_timeout: &Duration, -) -> anyhow::Result<()> { - let broker = &env.broker; - let listen_addr = &broker.listen_addr; - - print!("Starting neon broker at {}", listen_addr); - - let args = [format!("--listen-addr={listen_addr}")]; - - let client = reqwest::Client::new(); - background_process::start_process( - "storage_broker", - &env.base_data_dir, - &env.storage_broker_bin(), - args, - [], - background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)), - retry_timeout, - || async { - let url = broker.client_url(); - let status_url = url.join("status").with_context(|| { - format!("Failed to append /status path to broker endpoint {url}") - })?; - let request = client - .get(status_url) - .build() - .with_context(|| format!("Failed to construct request to broker endpoint {url}"))?; - match client.execute(request).await { - Ok(resp) => Ok(resp.status().is_success()), - Err(_) => Ok(false), - } - }, - ) - .await - .context("Failed to spawn storage_broker subprocess")?; - Ok(()) +pub struct StorageBroker { + env: LocalEnv, } -pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { - background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env)) -} +impl StorageBroker { + /// Create a new `StorageBroker` instance from the environment. + pub fn from_env(env: &LocalEnv) -> Self { + Self { env: env.clone() } + } -fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf { - Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid")) - .expect("non-Unicode path") + pub fn initialize(&self) -> anyhow::Result<()> { + if self.env.generate_local_ssl_certs { + self.env.generate_ssl_cert( + &self.env.storage_broker_data_dir().join("server.crt"), + &self.env.storage_broker_data_dir().join("server.key"), + )?; + } + Ok(()) + } + + /// Start the storage broker process. + pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> { + let broker = &self.env.broker; + + print!("Starting neon broker at {}", broker.client_url()); + + let mut args = Vec::new(); + + if let Some(addr) = &broker.listen_addr { + args.push(format!("--listen-addr={addr}")); + } + if let Some(addr) = &broker.listen_https_addr { + args.push(format!("--listen-https-addr={addr}")); + } + + let client = self.env.create_http_client(); + background_process::start_process( + "storage_broker", + &self.env.storage_broker_data_dir(), + &self.env.storage_broker_bin(), + args, + [], + background_process::InitialPidFile::Create(self.pid_file_path()), + retry_timeout, + || async { + let url = broker.client_url(); + let status_url = url.join("status").with_context(|| { + format!("Failed to append /status path to broker endpoint {url}") + })?; + let request = client.get(status_url).build().with_context(|| { + format!("Failed to construct request to broker endpoint {url}") + })?; + match client.execute(request).await { + Ok(resp) => Ok(resp.status().is_success()), + Err(_) => Ok(false), + } + }, + ) + .await + .context("Failed to spawn storage_broker subprocess")?; + Ok(()) + } + + /// Stop the storage broker process. + pub fn stop(&self) -> anyhow::Result<()> { + background_process::stop_process(true, "storage_broker", &self.pid_file_path()) + } + + /// Get the path to the PID file for the storage broker. + fn pid_file_path(&self) -> Utf8PathBuf { + Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_broker.pid")) + .expect("non-Unicode path") + } } diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 77d5c1c922..a18b34daa4 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -4,7 +4,7 @@ //! script which will use local paths. use std::collections::HashMap; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::time::Duration; @@ -14,11 +14,12 @@ use anyhow::{Context, bail}; use clap::ValueEnum; use pem::Pem; use postgres_backend::AuthType; -use reqwest::Url; +use reqwest::{Certificate, Url}; use serde::{Deserialize, Serialize}; use utils::auth::encode_from_key_file; use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; +use crate::broker::StorageBroker; use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage}; use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode}; use crate::safekeeper::SafekeeperNode; @@ -157,11 +158,16 @@ pub struct EndpointStorageConf { } /// Broker config for cluster internal communication. -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Default)] #[serde(default)] pub struct NeonBroker { - /// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'. - pub listen_addr: SocketAddr, + /// Broker listen HTTP address for storage nodes coordination, e.g. '127.0.0.1:50051'. + /// At least one of listen_addr or listen_https_addr must be set. + pub listen_addr: Option, + /// Broker listen HTTPS address for storage nodes coordination, e.g. '127.0.0.1:50051'. + /// At least one of listen_addr or listen_https_addr must be set. + /// listen_https_addr is preferred over listen_addr in neon_local. + pub listen_https_addr: Option, } /// A part of storage controller's config the neon_local knows about. @@ -235,18 +241,19 @@ impl Default for NeonStorageControllerConf { } } -// Dummy Default impl to satisfy Deserialize derive. -impl Default for NeonBroker { - fn default() -> Self { - NeonBroker { - listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), - } - } -} - impl NeonBroker { pub fn client_url(&self) -> Url { - Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url") + let url = if let Some(addr) = self.listen_https_addr { + format!("https://{}", addr) + } else { + format!( + "http://{}", + self.listen_addr + .expect("at least one address should be set") + ) + }; + + Url::parse(&url).expect("failed to construct url") } } @@ -441,6 +448,10 @@ impl LocalEnv { self.base_data_dir.join("endpoints") } + pub fn storage_broker_data_dir(&self) -> PathBuf { + self.base_data_dir.join("storage_broker") + } + pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf { self.base_data_dir .join(format!("pageserver_{pageserver_id}")) @@ -503,6 +514,23 @@ impl LocalEnv { ) } + /// Creates HTTP client with local SSL CA certificates. + pub fn create_http_client(&self) -> reqwest::Client { + let ssl_ca_certs = self.ssl_ca_cert_path().map(|ssl_ca_file| { + let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist"); + Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid") + }); + + let mut http_client = reqwest::Client::builder(); + for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() { + http_client = http_client.add_root_certificate(ssl_ca_cert); + } + + http_client + .build() + .expect("HTTP client should construct with no error") + } + /// Inspect the base data directory and extract the instance id and instance directory path /// for all storage controller instances pub async fn storage_controller_instances(&self) -> std::io::Result> { @@ -911,6 +939,12 @@ impl LocalEnv { // create endpoints dir fs::create_dir_all(env.endpoints_path())?; + // create storage broker dir + fs::create_dir_all(env.storage_broker_data_dir())?; + StorageBroker::from_env(&env) + .initialize() + .context("storage broker init failed")?; + // create safekeeper dirs for safekeeper in &env.safekeepers { fs::create_dir_all(SafekeeperNode::datadir_path_by_id(&env, safekeeper.id))?; diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index b9257a27bf..79e87eba9b 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -21,7 +21,6 @@ use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; use postgres_backend::AuthType; use postgres_connection::{PgConnectionConfig, parse_host_port}; -use reqwest::Certificate; use utils::auth::{Claims, Scope}; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; @@ -51,19 +50,6 @@ impl PageServerNode { parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); let port = port.unwrap_or(5432); - let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| { - let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist"); - Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid") - }); - - let mut http_client = reqwest::Client::builder(); - for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() { - http_client = http_client.add_root_certificate(ssl_ca_cert); - } - let http_client = http_client - .build() - .expect("Client constructs with no errors"); - let endpoint = if env.storage_controller.use_https_pageserver_api { format!( "https://{}", @@ -80,7 +66,7 @@ impl PageServerNode { conf: conf.clone(), env: env.clone(), http_client: mgmt_api::Client::new( - http_client, + env.create_http_client(), endpoint, { match conf.http_auth_type { diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 231871852e..948e3c8c93 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -87,7 +87,7 @@ impl SafekeeperNode { conf: conf.clone(), pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port), env: env.clone(), - http_client: reqwest::Client::new(), + http_client: env.create_http_client(), http_base_url: format!("http://{}:{}/v1", listen_addr, conf.http_port), listen_addr, } diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 62ad5fa8d6..a36815d27e 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -20,7 +20,7 @@ use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api::ResponseErrorMessageExt; use pem::Pem; use postgres_backend::AuthType; -use reqwest::{Certificate, Method}; +use reqwest::Method; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use tokio::process::Command; @@ -153,24 +153,11 @@ impl StorageController { } }; - let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| { - let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist"); - Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid") - }); - - let mut http_client = reqwest::Client::builder(); - for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() { - http_client = http_client.add_root_certificate(ssl_ca_cert); - } - let http_client = http_client - .build() - .expect("HTTP client should construct with no error"); - Self { env: env.clone(), private_key, public_key, - client: http_client, + client: env.create_http_client(), config: env.storage_controller.clone(), listen_port: OnceLock::default(), } diff --git a/docs/consumption_metrics.md b/docs/consumption_metrics.md index dd364f8750..6bcd28ab10 100644 --- a/docs/consumption_metrics.md +++ b/docs/consumption_metrics.md @@ -13,7 +13,7 @@ For design details see [the RFC](./rfcs/021-metering.md) and [the discussion on batch format is ```json -{ "events" : [metric1, metric2, ...]]} +{ "events" : [metric1, metric2, ...] } ``` See metric format examples below. @@ -49,11 +49,13 @@ Size of the remote storage (S3) directory. This is an absolute, per-tenant metric. - `timeline_logical_size` -Logical size of the data in the timeline + +Logical size of the data in the timeline. This is an absolute, per-timeline metric. - `synthetic_storage_size` -Size of all tenant's branches including WAL + +Size of all tenant's branches including WAL. This is the same metric that `tenant/{tenant_id}/size` endpoint returns. This is an absolute, per-tenant metric. @@ -106,10 +108,10 @@ This is an incremental, per-endpoint metric. ``` The metric is incremental, so the value is the difference between the current and the previous value. -If there is no previous value, the value, the value is the current value and the `start_time` equals `stop_time`. +If there is no previous value, the value is the current value and the `start_time` equals `stop_time`. ### TODO - [ ] Handle errors better: currently if one tenant fails to gather metrics, the whole iteration fails and metrics are not sent for any tenant. - [ ] Add retries -- [ ] Tune the interval \ No newline at end of file +- [ ] Tune the interval diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 91f9c03ba4..c5b49edba0 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -169,6 +169,8 @@ pub struct TenantDescribeResponseShard { pub is_pending_compute_notification: bool, /// A shard split is currently underway pub is_splitting: bool, + /// A timeline is being imported into this tenant + pub is_importing: bool, pub scheduling_policy: ShardSchedulingPolicy, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 997fc24052..698579e8fb 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3816,6 +3816,24 @@ impl TenantShard { MaybeDeletedIndexPart::IndexPart(p) => p, }; + // A shard split may not take place while a timeline import is on-going + // for the tenant. Timeline imports run as part of each tenant shard + // and rely on the sharding scheme to split the work among pageservers. + // If we were to split in the middle of this process, we would have to + // either ensure that it's driven to completion on the old shard set + // or transfer it to the new shard set. It's technically possible, but complex. + match index_part.import_pgdata { + Some(ref import) if !import.is_done() => { + anyhow::bail!( + "Cannot split due to import with idempotency key: {:?}", + import.idempotency_key() + ); + } + Some(_) | None => { + // fallthrough + } + } + for child_shard in child_shards { tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index()); upload_index_part( diff --git a/proxy/src/control_plane/errors.rs b/proxy/src/control_plane/errors.rs index 337ed665cc..850d061333 100644 --- a/proxy/src/control_plane/errors.rs +++ b/proxy/src/control_plane/errors.rs @@ -8,7 +8,7 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::proxy::retry::CouldRetry; /// A go-to error message which doesn't leak any detail. -pub(crate) const REQUEST_FAILED: &str = "Console request failed"; +pub(crate) const REQUEST_FAILED: &str = "Control plane request failed"; /// Common console API error. #[derive(Debug, Error)] diff --git a/storage_broker/Cargo.toml b/storage_broker/Cargo.toml index e4db9a317d..67b276c8fe 100644 --- a/storage_broker/Cargo.toml +++ b/storage_broker/Cargo.toml @@ -11,6 +11,7 @@ bench = [] anyhow.workspace = true async-stream.workspace = true bytes.workspace = true +camino.workspace = true clap = { workspace = true, features = ["derive"] } const_format.workspace = true futures.workspace = true @@ -19,12 +20,14 @@ futures-util.workspace = true humantime.workspace = true hyper = { workspace = true, features = ["full"] } http-body-util.workspace = true +http-utils.workspace = true hyper-util = "0.1" once_cell.workspace = true parking_lot.workspace = true prost.workspace = true tonic.workspace = true tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio-rustls.workspace = true tracing.workspace = true metrics.workspace = true utils.workspace = true diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index a7e0c986e6..476d5f03ea 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -17,10 +17,13 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; +use camino::Utf8PathBuf; use clap::{Parser, command}; +use futures::future::OptionFuture; use futures_core::Stream; use futures_util::StreamExt; use http_body_util::Full; +use http_utils::tls_certs::ReloadingCertificateResolver; use hyper::body::Incoming; use hyper::header::CONTENT_TYPE; use hyper::service::service_fn; @@ -38,7 +41,7 @@ use storage_broker::proto::{ FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse, SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage, }; -use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR, parse_proto_ttid}; +use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, parse_proto_ttid}; use tokio::net::TcpListener; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; @@ -59,12 +62,25 @@ project_build_tag!(BUILD_TAG); const DEFAULT_CHAN_SIZE: usize = 32; const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384; +const DEFAULT_SSL_KEY_FILE: &str = "server.key"; +const DEFAULT_SSL_CERT_FILE: &str = "server.crt"; +const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s"; + #[derive(Parser, Debug)] #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)] +#[clap(group( + clap::ArgGroup::new("listen-addresses") + .required(true) + .multiple(true) + .args(&["listen_addr", "listen_https_addr"]), +))] struct Args { - /// Endpoint to listen on. - #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)] - listen_addr: SocketAddr, + /// Endpoint to listen HTTP on. + #[arg(short, long)] + listen_addr: Option, + /// Endpoint to listen HTTPS on. + #[arg(long)] + listen_https_addr: Option, /// Size of the queue to the per timeline subscriber. #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)] timeline_chan_size: usize, @@ -72,11 +88,20 @@ struct Args { #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)] all_keys_chan_size: usize, /// HTTP/2 keepalive interval. - #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)] + #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)] http2_keepalive_interval: Duration, /// Format for logging, either 'plain' or 'json'. #[arg(long, default_value = "plain")] log_format: String, + /// Path to a file with certificate's private key for https API. + #[arg(long, default_value = DEFAULT_SSL_KEY_FILE)] + ssl_key_file: Utf8PathBuf, + /// Path to a file with a X509 certificate for https API. + #[arg(long, default_value = DEFAULT_SSL_CERT_FILE)] + ssl_cert_file: Utf8PathBuf, + /// Period to reload certificate and private key from files. + #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)] + ssl_cert_reload_period: Duration, } /// Id of publisher for registering in maps @@ -674,12 +699,50 @@ async fn main() -> Result<(), Box> { }; let storage_broker_server = BrokerServiceServer::new(storage_broker_impl); + let http_listener = match &args.listen_addr { + Some(addr) => { + info!("listening HTTP on {}", addr); + Some(TcpListener::bind(addr).await?) + } + None => None, + }; + + let (https_listener, tls_acceptor) = match &args.listen_https_addr { + Some(addr) => { + let listener = TcpListener::bind(addr).await?; + + let cert_resolver = ReloadingCertificateResolver::new( + "main", + &args.ssl_key_file, + &args.ssl_cert_file, + args.ssl_cert_reload_period, + ) + .await?; + + let mut tls_config = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(cert_resolver); + + // Tonic is HTTP/2 only and it negotiates it with ALPN. + tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + + let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config)); + + info!("listening HTTPS on {}", addr); + (Some(listener), Some(acceptor)) + } + None => (None, None), + }; + // grpc is served along with http1 for metrics on a single port, hence we // don't use tonic's Server. - let tcp_listener = TcpListener::bind(&args.listen_addr).await?; - info!("listening on {}", &args.listen_addr); loop { - let (stream, addr) = match tcp_listener.accept().await { + let (conn, is_https) = tokio::select! { + Some(conn) = OptionFuture::from(http_listener.as_ref().map(|l| l.accept())) => (conn, false), + Some(conn) = OptionFuture::from(https_listener.as_ref().map(|l| l.accept())) => (conn, true), + }; + + let (tcp_stream, addr) = match conn { Ok(v) => v, Err(e) => { info!("couldn't accept connection: {e}"); @@ -734,13 +797,32 @@ async fn main() -> Result<(), Box> { } .await; + let tls_acceptor = tls_acceptor.clone(); + tokio::task::spawn(async move { - let res = builder - .serve_connection(TokioIo::new(stream), service_fn_) - .await; + let res = if is_https { + let tls_acceptor = + tls_acceptor.expect("tls_acceptor is set together with https_listener"); + + let tls_stream = match tls_acceptor.accept(tcp_stream).await { + Ok(tls_stream) => tls_stream, + Err(e) => { + info!("error accepting TLS connection from {addr}: {e}"); + return; + } + }; + + builder + .serve_connection(TokioIo::new(tls_stream), service_fn_) + .await + } else { + builder + .serve_connection(TokioIo::new(tcp_stream), service_fn_) + .await + }; if let Err(e) = res { - info!("error serving connection from {addr}: {e}"); + info!(%is_https, "error serving connection from {addr}: {e}"); } }); } diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index a924e5b6c5..71dde9e126 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -196,7 +196,7 @@ struct Cli { ssl_cert_reload_period: humantime::Duration, /// Trusted root CA certificates to use in https APIs. #[arg(long)] - ssl_ca_file: Option, + ssl_ca_file: Option, /// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver /// the compute notification directly (instead of via control plane). diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 64a8846a9d..9ffcf9b9e6 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -134,6 +134,7 @@ pub(crate) enum DatabaseOperation { UpdateTimelineImport, DeleteTimelineImport, ListTimelineImports, + IsTenantImportingTimeline, } #[must_use] @@ -1641,9 +1642,7 @@ impl Persistence { .await } - pub(crate) async fn list_complete_timeline_imports( - &self, - ) -> DatabaseResult> { + pub(crate) async fn list_timeline_imports(&self) -> DatabaseResult> { use crate::schema::timeline_imports::dsl; let persistent = self .with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| { @@ -1660,10 +1659,7 @@ impl Persistence { .map(TimelineImport::from_persistent) .collect(); match imports { - Ok(ok) => Ok(ok - .into_iter() - .filter(|import| import.is_complete()) - .collect()), + Ok(ok) => Ok(ok.into_iter().collect()), Err(err) => Err(DatabaseError::Logical(format!( "failed to deserialize import: {err}" ))), @@ -1773,6 +1769,25 @@ impl Persistence { }) .await } + + pub(crate) async fn is_tenant_importing_timeline( + &self, + tenant_id: TenantId, + ) -> DatabaseResult { + use crate::schema::timeline_imports::dsl; + self.with_measured_conn(DatabaseOperation::IsTenantImportingTimeline, move |conn| { + Box::pin(async move { + let imports: i64 = dsl::timeline_imports + .filter(dsl::tenant_id.eq(tenant_id.to_string())) + .count() + .get_result(conn) + .await?; + + Ok(imports > 0) + }) + }) + .await + } } pub(crate) fn load_certs() -> anyhow::Result> { diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 7e5e3fd8f4..ca9b911c4d 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -97,7 +97,9 @@ use crate::tenant_shard::{ ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter, ScheduleOptimization, ScheduleOptimizationAction, TenantShard, }; -use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient}; +use crate::timeline_import::{ + ShardImportStatuses, TimelineImport, TimelineImportState, UpcallClient, +}; const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500); @@ -878,15 +880,33 @@ impl Service { }); } - // Fetch the list of completed imports and attempt to finalize them in the background. - // This handles the case where the previous storage controller instance shut down - // whilst finalizing imports. - let complete_imports = self.persistence.list_complete_timeline_imports().await; - match complete_imports { - Ok(ok) => { + // Reconcile the timeline imports: + // 1. Mark each tenant shard of tenants with an importing timeline as importing. + // 2. Finalize the completed imports in the background. This handles the case where + // the previous storage controller instance shut down whilst finalizing imports. + let imports = self.persistence.list_timeline_imports().await; + match imports { + Ok(mut imports) => { + { + let mut locked = self.inner.write().unwrap(); + for import in &imports { + locked + .tenants + .range_mut(TenantShardId::tenant_range(import.tenant_id)) + .for_each(|(_id, shard)| { + shard.importing = TimelineImportState::Importing + }); + } + } + + imports.retain(|import| import.is_complete()); tokio::task::spawn({ let finalize_imports_self = self.clone(); - async move { finalize_imports_self.finalize_timeline_imports(ok).await } + async move { + finalize_imports_self + .finalize_timeline_imports(imports) + .await + } }); } Err(err) => { @@ -3772,6 +3792,22 @@ impl Service { failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock"); let is_import = create_req.is_import(); + if is_import { + // Ensure that there is no split on-going. + // [`Self::tenant_shard_split`] holds the exclusive tenant lock + // for the duration of the split, but here we handle the case + // where we restarted and the split is being aborted. + let locked = self.inner.read().unwrap(); + let splitting = locked + .tenants + .range(TenantShardId::tenant_range(tenant_id)) + .any(|(_id, shard)| shard.splitting != SplitState::Idle); + + if splitting { + return Err(ApiError::Conflict("Tenant is splitting shard".to_string())); + } + } + let timeline_info = self .tenant_timeline_create_pageservers(tenant_id, create_req) .await?; @@ -3809,6 +3845,14 @@ impl Service { .context("timeline import insert") .map_err(ApiError::InternalServerError)?; + // Set the importing flag on the tenant shards + self.inner + .write() + .unwrap() + .tenants + .range_mut(TenantShardId::tenant_range(tenant_id)) + .for_each(|(_id, shard)| shard.importing = TimelineImportState::Importing); + match inserted { true => { tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import"); @@ -3931,6 +3975,13 @@ impl Service { tracing::warn!("Failed to delete timeline import entry from database: {err}"); } + self.inner + .write() + .unwrap() + .tenants + .range_mut(TenantShardId::tenant_range(import.tenant_id)) + .for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle); + // TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn, // so we can't create the timeline on the safekeepers. Fix by moving creation here. // https://github.com/neondatabase/neon/issues/11569 @@ -4914,6 +4965,7 @@ impl Service { is_reconciling: shard.reconciler.is_some(), is_pending_compute_notification: shard.pending_compute_notification, is_splitting: matches!(shard.splitting, SplitState::Splitting), + is_importing: shard.importing == TimelineImportState::Importing, scheduling_policy: shard.get_scheduling_policy(), preferred_az_id: shard.preferred_az().map(ToString::to_string), }) @@ -5404,6 +5456,27 @@ impl Service { .enter() .map_err(|_| ApiError::ShuttingDown)?; + // Timeline imports on the pageserver side can't handle shard-splits. + // If the tenant is importing a timeline, dont't shard split it. + match self + .persistence + .is_tenant_importing_timeline(tenant_id) + .await + { + Ok(importing) => { + if importing { + return Err(ApiError::Conflict( + "Cannot shard split during timeline import".to_string(), + )); + } + } + Err(err) => { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Failed to check for running imports: {err}" + ))); + } + } + let new_shard_count = ShardCount::new(split_req.new_shard_count); let new_stripe_size = split_req.new_stripe_size; @@ -8076,12 +8149,25 @@ impl Service { candidates.extend(size_candidates); } - // Filter out tenants in a prohibiting scheduling mode. + // Filter out tenants in a prohibiting scheduling modes + // and tenants with an ongoing import. + // + // Note that the import check here is oportunistic. An import might start + // after the check before we actually update [`TenantShard::splitting`]. + // [`Self::tenant_shard_split`] checks the database whilst holding the exclusive + // tenant lock. Imports might take a long time, so the check here allows us + // to split something else instead of trying the same shard over and over. { let state = self.inner.read().unwrap(); candidates.retain(|i| { - let policy = state.tenants.get(&i.id).map(|s| s.get_scheduling_policy()); - policy == Some(ShardSchedulingPolicy::Active) + let shard = state.tenants.get(&i.id); + match shard { + Some(t) => { + t.get_scheduling_policy() == ShardSchedulingPolicy::Active + && t.importing == TimelineImportState::Idle + } + None => false, + } }); } diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 3a75e96cb2..c7b2628ec4 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -33,6 +33,7 @@ use crate::scheduler::{ RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag, }; use crate::service::ReconcileResultRequest; +use crate::timeline_import::TimelineImportState; use crate::{Sequence, service}; /// Serialization helper @@ -100,6 +101,10 @@ pub(crate) struct TenantShard { /// reconciliation, and timeline creation. pub(crate) splitting: SplitState, + /// Flag indicating whether the tenant has an in-progress timeline import. + /// Used to disallow shard splits while an import is in progress. + pub(crate) importing: TimelineImportState, + /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag /// is set. This flag is cleared when the tenant is popped off the delay queue. pub(crate) delayed_reconcile: bool, @@ -583,6 +588,7 @@ impl TenantShard { config: TenantConfig::default(), reconciler: None, splitting: SplitState::Idle, + importing: TimelineImportState::Idle, sequence: Sequence(1), delayed_reconcile: false, waiter: Arc::new(SeqWait::new(Sequence(0))), @@ -1844,6 +1850,8 @@ impl TenantShard { config: serde_json::from_str(&tsp.config).unwrap(), reconciler: None, splitting: tsp.splitting, + // Filled in during [`Service::startup_reconcile`] + importing: TimelineImportState::Idle, waiter: Arc::new(SeqWait::new(Sequence::initial())), error_waiter: Arc::new(SeqWait::new(Sequence::initial())), last_error: Arc::default(), diff --git a/storage_controller/src/timeline_import.rs b/storage_controller/src/timeline_import.rs index b6dd4b252e..6dcc538c4b 100644 --- a/storage_controller/src/timeline_import.rs +++ b/storage_controller/src/timeline_import.rs @@ -14,6 +14,12 @@ use utils::{ use crate::{persistence::TimelineImportPersistence, service::Config}; +#[derive(Deserialize, Serialize, PartialEq, Eq)] +pub(crate) enum TimelineImportState { + Importing, + Idle, +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub(crate) struct ShardImportStatuses(pub(crate) HashMap); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 48aa739ce4..1d668d4b2d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -501,6 +501,9 @@ class NeonEnvBuilder: # Flag to use https listener in storage controller, generate local ssl certs, # and force pageservers and neon_local to use https for storage controller api. self.use_https_storage_controller_api: bool = False + # Flag to use https listener in storage broker, generate local ssl certs, + # and force pageservers and safekeepers to use https for storage broker api. + self.use_https_storage_broker_api: bool = False self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine self.pageserver_get_vectored_concurrent_io: str | None = ( @@ -1086,7 +1089,7 @@ class NeonEnv: self.safekeepers: list[Safekeeper] = [] self.pageservers: list[NeonPageserver] = [] self.num_azs = config.num_azs - self.broker = NeonBroker(self) + self.broker = NeonBroker(self, config.use_https_storage_broker_api) self.pageserver_remote_storage = config.pageserver_remote_storage self.safekeepers_remote_storage = config.safekeepers_remote_storage self.pg_version = config.pg_version @@ -1106,6 +1109,7 @@ class NeonEnv: config.use_https_pageserver_api or config.use_https_safekeeper_api or config.use_https_storage_controller_api + or config.use_https_storage_broker_api ) self.ssl_ca_file = ( self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None @@ -1178,15 +1182,18 @@ class NeonEnv: # Create the neon_local's `NeonLocalInitConf` cfg: dict[str, Any] = { "default_tenant_id": str(self.initial_tenant), - "broker": { - "listen_addr": self.broker.listen_addr(), - }, + "broker": {}, "safekeepers": [], "pageservers": [], "endpoint_storage": {"port": self.port_distributor.get_port()}, "generate_local_ssl_certs": self.generate_local_ssl_certs, } + if config.use_https_storage_broker_api: + cfg["broker"]["listen_https_addr"] = self.broker.listen_addr() + else: + cfg["broker"]["listen_addr"] = self.broker.listen_addr() + if self.control_plane_api is not None: cfg["control_plane_api"] = self.control_plane_api @@ -4933,9 +4940,10 @@ class Safekeeper(LogUtils): class NeonBroker(LogUtils): """An object managing storage_broker instance""" - def __init__(self, env: NeonEnv): - super().__init__(logfile=env.repo_dir / "storage_broker.log") + def __init__(self, env: NeonEnv, use_https: bool): + super().__init__(logfile=env.repo_dir / "storage_broker" / "storage_broker.log") self.env = env + self.scheme = "https" if use_https else "http" self.port: int = self.env.port_distributor.get_port() self.running = False @@ -4958,7 +4966,7 @@ class NeonBroker(LogUtils): return f"127.0.0.1:{self.port}" def client_url(self): - return f"http://{self.listen_addr()}" + return f"{self.scheme}://{self.listen_addr()}" def assert_no_errors(self): assert_no_errors(self.logfile, "storage_controller", []) diff --git a/test_runner/random_ops/test_random_ops.py b/test_runner/random_ops/test_random_ops.py index b3078ecac1..643151fa11 100644 --- a/test_runner/random_ops/test_random_ops.py +++ b/test_runner/random_ops/test_random_ops.py @@ -323,6 +323,7 @@ class NeonProject: if self.restart_pgbench_on_console_errors and ( "ERROR: Couldn't connect to compute node" in err or "ERROR: Console request failed" in err + or "ERROR: Control plane request failed" in err ): log.info("Restarting benchmark for %s", target) self.benchmarks.pop(target) diff --git a/test_runner/regress/test_ssl.py b/test_runner/regress/test_ssl.py index 39c94c05a9..62879834c3 100644 --- a/test_runner/regress/test_ssl.py +++ b/test_runner/regress/test_ssl.py @@ -6,6 +6,7 @@ import pytest import requests from fixtures.neon_fixtures import NeonEnvBuilder, StorageControllerApiException from fixtures.utils import wait_until +from fixtures.workload import Workload def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder): @@ -212,3 +213,24 @@ def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder): assert reload_error_cnt > 0 wait_until(reload_failed) + + +def test_storage_broker_https_api(neon_env_builder: NeonEnvBuilder): + """ + Test HTTPS storage broker API. + 1. Make /status request to HTTPS API to ensure it's appropriately configured. + 2. Generate simple workload to ensure that SK -> broker -> PS communication works well. + """ + neon_env_builder.use_https_storage_broker_api = True + env = neon_env_builder.init_start() + + # 1. Simple check that HTTPS is enabled and works. + url = env.broker.client_url() + "/status" + assert url.startswith("https://") + requests.get(url, verify=str(env.ssl_ca_file)).raise_for_status() + + # 2. Simple workload to check that SK -> broker -> PS communication works over HTTPS. + workload = Workload(env, env.initial_tenant, env.initial_timeline) + workload.init() + workload.write_rows(10) + workload.validate()