mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 05:00:38 +00:00
Compare commits
5 Commits
problame/t
...
proxy-refa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6fcf0f2754 | ||
|
|
d336b8b5d9 | ||
|
|
4d68e3108f | ||
|
|
3e150419ef | ||
|
|
9e424d2f84 |
@@ -1,39 +1,37 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://console-release.local/management/api/v2"
|
||||
domain: "*.cloud.neon.tech"
|
||||
authBackend: "link"
|
||||
authEndpoint: "https://console.neon.tech/authenticate_proxy_request/"
|
||||
uri: "https://console.neon.tech/psql_session/"
|
||||
sentryEnvironment: "production"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://console-release.local/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "10min"
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
# -- Additional labels for zenith-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: prod
|
||||
zenith_service: proxy
|
||||
zenith_env: production
|
||||
zenith_region: us-west-2
|
||||
zenith_region_slug: us-west-2
|
||||
zenith_region_slug: oregon
|
||||
|
||||
service:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal
|
||||
external-dns.alpha.kubernetes.io/hostname: proxy-release.local
|
||||
type: LoadBalancer
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
|
||||
external-dns.alpha.kubernetes.io/hostname: neon-proxy-scram-legacy.eta.us-west-2.aws.neon.tech
|
||||
httpsPort: 443
|
||||
external-dns.alpha.kubernetes.io/hostname: connect.neon.tech,pg.neon.tech
|
||||
|
||||
#metrics:
|
||||
# enabled: true
|
||||
# serviceMonitor:
|
||||
# enabled: true
|
||||
# selector:
|
||||
# release: kube-prometheus-stack
|
||||
metrics:
|
||||
enabled: true
|
||||
serviceMonitor:
|
||||
enabled: true
|
||||
selector:
|
||||
release: kube-prometheus-stack
|
||||
|
||||
extraManifests:
|
||||
- apiVersion: operator.victoriametrics.com/v1beta1
|
||||
112
.github/workflows/build_and_test.yml
vendored
112
.github/workflows/build_and_test.yml
vendored
@@ -690,37 +690,20 @@ jobs:
|
||||
promote-images:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
needs: [ tag, test-images, vm-compute-node-image ]
|
||||
container: golang:1.19-bullseye
|
||||
if: github.event_name != 'workflow_dispatch'
|
||||
container: amazon/aws-cli
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
name: [ neon, compute-node-v14, vm-compute-node-v14, compute-node-v15, vm-compute-node-v15, compute-tools]
|
||||
env:
|
||||
AWS_DEFAULT_REGION: eu-central-1
|
||||
|
||||
steps:
|
||||
- name: Install Crane & ECR helper
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
- name: Promote image to latest
|
||||
run: |
|
||||
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
|
||||
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
|
||||
|
||||
- name: Configure ECR login
|
||||
run: |
|
||||
mkdir /github/home/.docker/
|
||||
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
|
||||
|
||||
- name: Add latest tag to images
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
|
||||
- name: Cleanup ECR folder
|
||||
run: rm -rf ~/.ecr
|
||||
export MANIFEST=$(aws ecr batch-get-image --repository-name ${{ matrix.name }} --image-ids imageTag=${{needs.tag.outputs.build-tag}} --query 'images[].imageManifest' --output text)
|
||||
aws ecr put-image --repository-name ${{ matrix.name }} --image-tag latest --image-manifest "$MANIFEST"
|
||||
|
||||
push-docker-hub:
|
||||
runs-on: [ self-hosted, dev, x64 ]
|
||||
@@ -879,15 +862,9 @@ jobs:
|
||||
ANSIBLE_CONFIG=./ansible.cfg ansible-playbook deploy.yaml -i ${{ matrix.env_name }}.hosts.yaml -e CONSOLE_API_TOKEN=${{ secrets[matrix.console_api_key_secret] }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
rm -f neon_install.tar.gz .neon_current_version
|
||||
|
||||
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ansible/collections': Permission denied
|
||||
- name: Cleanup ansible folder
|
||||
run: rm -rf ~/.ansible
|
||||
|
||||
deploy-new:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
options: --user root --privileged
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
|
||||
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
|
||||
needs: [ push-docker-hub, tag, regress-tests ]
|
||||
@@ -925,9 +902,6 @@ jobs:
|
||||
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
rm -f neon_install.tar.gz .neon_current_version
|
||||
|
||||
- name: Cleanup ansible folder
|
||||
run: rm -rf ~/.ansible
|
||||
|
||||
deploy-pr-test-new:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
@@ -961,9 +935,6 @@ jobs:
|
||||
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
rm -f neon_install.tar.gz .neon_current_version
|
||||
|
||||
- name: Cleanup ansible folder
|
||||
run: rm -rf ~/.ansible
|
||||
|
||||
deploy-prod-new:
|
||||
runs-on: prod
|
||||
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
|
||||
@@ -1008,7 +979,7 @@ jobs:
|
||||
|
||||
deploy-proxy:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
|
||||
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
|
||||
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
|
||||
if: |
|
||||
@@ -1031,26 +1002,29 @@ jobs:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Add curl
|
||||
run: apt update && apt install curl -y
|
||||
|
||||
- name: Store kubeconfig file
|
||||
run: |
|
||||
echo "${{ secrets[matrix.kubeconfig_secret] }}" | base64 --decode > ${KUBECONFIG}
|
||||
chmod 0600 ${KUBECONFIG}
|
||||
|
||||
- name: Add neon helm chart
|
||||
run: helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
- name: Setup helm v3
|
||||
run: |
|
||||
curl -s https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
|
||||
- name: Re-deploy proxy
|
||||
run: |
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install --atomic -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
|
||||
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install --atomic -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
|
||||
|
||||
- name: Cleanup helm folder
|
||||
run: rm -rf ~/.cache
|
||||
|
||||
deploy-storage-broker:
|
||||
name: deploy storage broker on old staging and old prod
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
|
||||
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
|
||||
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
|
||||
if: |
|
||||
@@ -1073,21 +1047,23 @@ jobs:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Add curl
|
||||
run: apt update && apt install curl -y
|
||||
|
||||
- name: Store kubeconfig file
|
||||
run: |
|
||||
echo "${{ secrets[matrix.kubeconfig_secret] }}" | base64 --decode > ${KUBECONFIG}
|
||||
chmod 0600 ${KUBECONFIG}
|
||||
|
||||
- name: Add neon helm chart
|
||||
run: helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
- name: Setup helm v3
|
||||
run: |
|
||||
curl -s https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
|
||||
- name: Deploy storage-broker
|
||||
run:
|
||||
helm upgrade neon-storage-broker neondatabase/neon-storage-broker --namespace ${{ matrix.storage_broker_ns }} --create-namespace --install --atomic -f .github/helm-values/${{ matrix.storage_broker_config }}.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
|
||||
|
||||
- name: Cleanup helm folder
|
||||
run: rm -rf ~/.cache
|
||||
|
||||
deploy-proxy-new:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
@@ -1119,14 +1095,6 @@ jobs:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure AWS Credentials
|
||||
uses: aws-actions/configure-aws-credentials@v1-node16
|
||||
with:
|
||||
role-to-assume: arn:aws:iam::369495373322:role/github-runner
|
||||
aws-region: eu-central-1
|
||||
role-skip-session-tagging: true
|
||||
role-duration-seconds: 1800
|
||||
|
||||
- name: Configure environment
|
||||
run: |
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
@@ -1149,9 +1117,6 @@ jobs:
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade neon-proxy-scram-legacy neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram-legacy.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
|
||||
|
||||
- name: Cleanup helm folder
|
||||
run: rm -rf ~/.cache
|
||||
|
||||
deploy-storage-broker-dev-new:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
@@ -1179,14 +1144,6 @@ jobs:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure AWS Credentials
|
||||
uses: aws-actions/configure-aws-credentials@v1-node16
|
||||
with:
|
||||
role-to-assume: arn:aws:iam::369495373322:role/github-runner
|
||||
aws-region: eu-central-1
|
||||
role-skip-session-tagging: true
|
||||
role-duration-seconds: 1800
|
||||
|
||||
- name: Configure environment
|
||||
run: |
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
@@ -1196,9 +1153,6 @@ jobs:
|
||||
run:
|
||||
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
|
||||
|
||||
- name: Cleanup helm folder
|
||||
run: rm -rf ~/.cache
|
||||
|
||||
deploy-proxy-prod-new:
|
||||
runs-on: prod
|
||||
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
|
||||
@@ -1216,19 +1170,15 @@ jobs:
|
||||
- target_region: us-east-2
|
||||
target_cluster: prod-us-east-2-delta
|
||||
deploy_link_proxy: true
|
||||
deploy_legacy_scram_proxy: false
|
||||
- target_region: us-west-2
|
||||
target_cluster: prod-us-west-2-eta
|
||||
deploy_link_proxy: false
|
||||
deploy_legacy_scram_proxy: true
|
||||
- target_region: eu-central-1
|
||||
target_cluster: prod-eu-central-1-gamma
|
||||
deploy_link_proxy: false
|
||||
deploy_legacy_scram_proxy: false
|
||||
- target_region: ap-southeast-1
|
||||
target_cluster: prod-ap-southeast-1-epsilon
|
||||
deploy_link_proxy: false
|
||||
deploy_legacy_scram_proxy: false
|
||||
environment:
|
||||
name: prod-${{ matrix.target_region }}
|
||||
steps:
|
||||
@@ -1254,12 +1204,6 @@ jobs:
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade neon-proxy-link neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-link.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
|
||||
|
||||
- name: Re-deploy legacy scram proxy
|
||||
if: matrix.deploy_legacy_scram_proxy
|
||||
run: |
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade neon-proxy-scram-legacy neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram-legacy.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
|
||||
|
||||
deploy-storage-broker-prod-new:
|
||||
runs-on: prod
|
||||
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
|
||||
|
||||
@@ -250,7 +250,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
let signals = signals::install_shutdown_handlers()?;
|
||||
|
||||
// Launch broker client
|
||||
WALRECEIVER_RUNTIME.block_on(pageserver::broker_client::init_broker_client(conf))?;
|
||||
WALRECEIVER_RUNTIME.block_on(pageserver::walreceiver::init_broker_client(conf))?;
|
||||
|
||||
// Initialize authentication for incoming connections
|
||||
let auth = match &conf.auth_type {
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
//! The broker client instance of the pageserver, created during pageserver startup.
|
||||
//! Used by each timelines' [`walreceiver`].
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tracing::*;
|
||||
|
||||
static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
|
||||
|
||||
///
|
||||
/// Initialize the broker client. This must be called once at page server startup.
|
||||
///
|
||||
pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
let broker_endpoint = conf.broker_endpoint.clone();
|
||||
|
||||
// Note: we do not attempt connecting here (but validate endpoints sanity).
|
||||
let broker_client =
|
||||
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
|
||||
format!(
|
||||
"Failed to create broker client to {}",
|
||||
&conf.broker_endpoint
|
||||
),
|
||||
)?;
|
||||
|
||||
if BROKER_CLIENT.set(broker_client).is_err() {
|
||||
panic!("broker already initialized");
|
||||
}
|
||||
|
||||
info!(
|
||||
"Initialized broker client with endpoints: {}",
|
||||
broker_endpoint
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Get a handle to the broker client
|
||||
///
|
||||
pub fn get_broker_client() -> &'static BrokerClientChannel {
|
||||
BROKER_CLIENT.get().expect("broker client not initialized")
|
||||
}
|
||||
|
||||
pub fn is_broker_client_initialized() -> bool {
|
||||
BROKER_CLIENT.get().is_some()
|
||||
}
|
||||
@@ -29,7 +29,7 @@ use utils::{
|
||||
|
||||
use crate::tenant::config::TenantConf;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{TENANT_ATTACHING_MARKER_SUFFIX, TIMELINES_SEGMENT_NAME};
|
||||
use crate::tenant::{TENANT_ATTACHING_MARKER_FILENAME, TIMELINES_SEGMENT_NAME};
|
||||
use crate::{
|
||||
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_UNINIT_MARK_SUFFIX,
|
||||
};
|
||||
@@ -459,7 +459,8 @@ impl PageServerConf {
|
||||
}
|
||||
|
||||
pub fn tenant_attaching_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf {
|
||||
path_with_suffix_extension(self.tenant_path(tenant_id), TENANT_ATTACHING_MARKER_SUFFIX)
|
||||
self.tenant_path(tenant_id)
|
||||
.join(TENANT_ATTACHING_MARKER_FILENAME)
|
||||
}
|
||||
|
||||
pub fn tenant_ignore_mark_file_path(&self, tenant_id: TenantId) -> PathBuf {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
mod auth;
|
||||
pub mod basebackup;
|
||||
pub mod broker_client;
|
||||
pub mod config;
|
||||
pub mod consumption_metrics;
|
||||
pub mod context;
|
||||
@@ -17,6 +16,7 @@ pub mod tenant;
|
||||
pub mod trace;
|
||||
pub mod virtual_file;
|
||||
pub mod walingest;
|
||||
pub mod walreceiver;
|
||||
pub mod walrecord;
|
||||
pub mod walredo;
|
||||
|
||||
|
||||
@@ -1653,7 +1653,7 @@ mod tests {
|
||||
assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
|
||||
|
||||
// Create a branch, check that the relation is visible there
|
||||
repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
|
||||
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x30))?;
|
||||
let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
|
||||
Some(timeline) => timeline,
|
||||
None => panic!("Should have a local timeline"),
|
||||
|
||||
@@ -238,17 +238,11 @@ pub enum TaskKind {
|
||||
// Task that downloads a file from remote storage
|
||||
RemoteDownloadTask,
|
||||
|
||||
// task that handles loading of a tenant during pageserver startup
|
||||
TenantLoadStartup,
|
||||
|
||||
// task that handles loading of a tenant in response to a /load HTTP API request
|
||||
TenantLoadApi,
|
||||
|
||||
// task that handles loading of a tenant as part of the tenant creation procedure
|
||||
TenantLoadCreate,
|
||||
// task that handles the initial downloading of all tenants
|
||||
InitialLoad,
|
||||
|
||||
// task that handles attaching a tenant
|
||||
TenantAttach,
|
||||
Attach,
|
||||
|
||||
// task that handhes metrics collection
|
||||
MetricsCollection,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -19,13 +19,12 @@ use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{Tenant, TenantState, TENANT_ATTACHING_LEGACY_MARKER_FILENAME};
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
use crate::IGNORED_TENANT_FILE_NAME;
|
||||
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::{TenantLoadReasonNotAttach, TENANT_ATTACHING_MARKER_SUFFIX};
|
||||
|
||||
/// The tenants known to the pageserver.
|
||||
/// The enum variants are used to distinguish the different states that the pageserver can be in.
|
||||
enum TenantsMap {
|
||||
@@ -67,11 +66,6 @@ pub async fn init_tenant_mgr(
|
||||
// Scan local filesystem for attached tenants
|
||||
let tenants_dir = conf.tenants_path();
|
||||
|
||||
// Other code in pageserver assumes new attaching markers.
|
||||
// Do the migration here, abort startup if it fails.
|
||||
Tenant::migrate_attaching_marker_files(&conf.tenants_path())
|
||||
.context("attaching marker migration failed")?;
|
||||
|
||||
let mut tenants = HashMap::new();
|
||||
|
||||
let mut dir_entries = fs::read_dir(&tenants_dir)
|
||||
@@ -98,12 +92,18 @@ pub async fn init_tenant_mgr(
|
||||
);
|
||||
}
|
||||
} else {
|
||||
if tenant_dir_path
|
||||
.to_string_lossy()
|
||||
.ends_with(TENANT_ATTACHING_MARKER_SUFFIX)
|
||||
{
|
||||
// schedule_local_tenant_processing checks for marker when it encounters a tenant dir
|
||||
info!("found a tenant attaching marker {tenant_dir_path:?}, skipping");
|
||||
// This case happens if we crash during attach before creating the attach marker file
|
||||
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
|
||||
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
|
||||
})?;
|
||||
if is_empty {
|
||||
info!("removing empty tenant directory {tenant_dir_path:?}");
|
||||
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
|
||||
error!(
|
||||
"Failed to remove empty tenant directory '{}': {e:#}",
|
||||
tenant_dir_path.display()
|
||||
)
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -117,7 +117,6 @@ pub async fn init_tenant_mgr(
|
||||
conf,
|
||||
&tenant_dir_path,
|
||||
remote_storage.clone(),
|
||||
TenantLoadReasonNotAttach::PageserverStartup,
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
@@ -148,11 +147,10 @@ pub async fn init_tenant_mgr(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_local_tenant_processing(
|
||||
pub fn schedule_local_tenant_processing(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_path: &Path,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
load_reason: TenantLoadReasonNotAttach,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
anyhow::ensure!(
|
||||
@@ -164,10 +162,10 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
"Cannot load tenant from temporary path {tenant_path:?}"
|
||||
);
|
||||
anyhow::ensure!(
|
||||
!tenant_path
|
||||
.to_string_lossy()
|
||||
.ends_with(TENANT_ATTACHING_MARKER_SUFFIX),
|
||||
"Caller must filter these out: {tenant_path:?}"
|
||||
!tenant_path.is_empty_dir().with_context(|| {
|
||||
format!("Failed to check whether {tenant_path:?} is an empty dir")
|
||||
})?,
|
||||
"Cannot load tenant from empty directory {tenant_path:?}"
|
||||
);
|
||||
|
||||
let tenant_id = tenant_path
|
||||
@@ -185,22 +183,10 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
|
||||
);
|
||||
|
||||
let legacy_attaching_marker = tenant_path.join(TENANT_ATTACHING_LEGACY_MARKER_FILENAME);
|
||||
anyhow::ensure!(
|
||||
!legacy_attaching_marker.exists(),
|
||||
"legacy attaching marker still present, migration code must have been not called or has a bug: {legacy_attaching_marker:?}"
|
||||
);
|
||||
|
||||
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
|
||||
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
|
||||
if let Some(remote_storage) = remote_storage {
|
||||
match Tenant::spawn_resume_attach(conf, tenant_id, remote_storage, ctx) {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e) => {
|
||||
warn!("tenant {tenant_id} failed to resume attach operation: {e:#}");
|
||||
Tenant::create_broken_tenant(conf, tenant_id)
|
||||
}
|
||||
}
|
||||
Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx)
|
||||
} else {
|
||||
warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured");
|
||||
Tenant::create_broken_tenant(conf, tenant_id)
|
||||
@@ -208,7 +194,7 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
} else {
|
||||
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
|
||||
// Start loading the tenant into memory. It will initially be in Loading state.
|
||||
Tenant::spawn_load(conf, tenant_id, remote_storage, load_reason, ctx)
|
||||
Tenant::spawn_load(conf, tenant_id, remote_storage, ctx)
|
||||
};
|
||||
Ok(tenant)
|
||||
}
|
||||
@@ -288,7 +274,7 @@ pub async fn create_tenant(
|
||||
// and do the work in that state.
|
||||
let tenant_directory = super::create_tenant_files(conf, tenant_conf, tenant_id)?;
|
||||
let created_tenant =
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, TenantLoadReasonNotAttach::Create, ctx)?;
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, ctx)?;
|
||||
let crated_tenant_id = created_tenant.tenant_id();
|
||||
anyhow::ensure!(
|
||||
tenant_id == crated_tenant_id,
|
||||
@@ -305,11 +291,10 @@ pub async fn update_tenant_config(
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("configuring tenant {tenant_id}");
|
||||
let tenant = get_tenant(tenant_id, true).await?;
|
||||
|
||||
tenant.update_tenant_config(tenant_conf);
|
||||
let tenant_config_path = conf.tenant_config_path(tenant_id);
|
||||
Tenant::persist_tenant_config(&tenant.tenant_id(), &tenant_config_path, tenant_conf, false)?;
|
||||
get_tenant(tenant_id, true)
|
||||
.await?
|
||||
.update_tenant_config(tenant_conf);
|
||||
Tenant::persist_tenant_config(&conf.tenant_config_path(tenant_id), tenant_conf, false)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -375,7 +360,7 @@ pub async fn load_tenant(
|
||||
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
|
||||
}
|
||||
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, remote_storage, TenantLoadReasonNotAttach::LoadApi, ctx)
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, remote_storage, ctx)
|
||||
.with_context(|| {
|
||||
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
||||
})?;
|
||||
@@ -435,8 +420,13 @@ pub async fn attach_tenant(
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), TenantMapInsertError> {
|
||||
tenant_map_insert(tenant_id, |vacant_entry| {
|
||||
let tenant = Tenant::spawn_start_attach(conf, tenant_id, remote_storage, ctx)
|
||||
.map_err(|source| anyhow::anyhow!("attach tenant {tenant_id}: {source:#}"))?;
|
||||
let tenant_path = conf.tenant_path(&tenant_id);
|
||||
anyhow::ensure!(
|
||||
!tenant_path.exists(),
|
||||
"Cannot attach tenant {tenant_id}, local tenant directory already exists"
|
||||
);
|
||||
|
||||
let tenant = Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx);
|
||||
vacant_entry.insert(tenant);
|
||||
Ok(())
|
||||
})
|
||||
|
||||
@@ -157,26 +157,17 @@
|
||||
//! downloading files from the remote storage. Downloads are performed immediately
|
||||
//! against the `RemoteStorage`, independently of the upload queue.
|
||||
//!
|
||||
//! When we attach a tenant, we prepare the on-disk state based on the remote state,
|
||||
//! then use the same code that's used to set up the tenant during pageserver startup:
|
||||
//!
|
||||
//! When we attach a tenant, we perform the following steps:
|
||||
//! - create `Tenant` object in `TenantState::Attaching` state
|
||||
//! - Create an attaching marker file for this tenant on disk.
|
||||
//! - List timelines that are present in remote storage, and for each:
|
||||
//! - download their remote [`IndexPart`]s
|
||||
//! - create the local `metadata` file from the [`IndexPart`] contents
|
||||
//! - Remove the attaching marker file.
|
||||
//! - tell the `Tenant` object to load the prepared on-disk state.
|
||||
//!
|
||||
//! Loading the on-disk state performs the following steps:
|
||||
//!
|
||||
//! - create `Timeline` struct and a `RemoteTimelineClient`
|
||||
//! - initialize the client's upload queue with the `IndexPart`
|
||||
//! - for attach, we carry this over in memory
|
||||
//! - during pageserver startup, we refresh the IndexParts from the remote
|
||||
//! - initialize the client's upload queue with its `IndexPart`
|
||||
//! - create [`RemoteLayer`] instances for layers that are referenced by `IndexPart`
|
||||
//! but not present locally
|
||||
//! - schedule uploads for layers that are only present locally.
|
||||
//! - if the remote `IndexPart`'s metadata was newer than the metadata in
|
||||
//! the local filesystem, write the remote metadata to the local filesystem
|
||||
//! - After the above is done for each timeline, open the tenant for business by
|
||||
//! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
|
||||
//! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
//!
|
||||
|
||||
mod walreceiver;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use bytes::Bytes;
|
||||
use fail::fail_point;
|
||||
@@ -25,7 +23,6 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::broker_client::is_broker_client_initialized;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -62,11 +59,11 @@ use crate::page_cache;
|
||||
use crate::repository::GcResult;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::walreceiver::{is_broker_client_initialized, spawn_connection_manager_task};
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::METADATA_FILE_NAME;
|
||||
use crate::ZERO_PAGE;
|
||||
use crate::{is_temporary, task_mgr};
|
||||
use walreceiver::spawn_connection_manager_task;
|
||||
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
|
||||
@@ -23,15 +23,58 @@
|
||||
mod connection_manager;
|
||||
mod walreceiver_connection;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::task_mgr::WALRECEIVER_RUNTIME;
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::future::Future;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
pub use connection_manager::spawn_connection_manager_task;
|
||||
|
||||
static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
|
||||
|
||||
///
|
||||
/// Initialize the broker client. This must be called once at page server startup.
|
||||
///
|
||||
pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
let broker_endpoint = conf.broker_endpoint.clone();
|
||||
|
||||
// Note: we do not attempt connecting here (but validate endpoints sanity).
|
||||
let broker_client =
|
||||
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
|
||||
format!(
|
||||
"Failed to create broker client to {}",
|
||||
&conf.broker_endpoint
|
||||
),
|
||||
)?;
|
||||
|
||||
if BROKER_CLIENT.set(broker_client).is_err() {
|
||||
panic!("broker already initialized");
|
||||
}
|
||||
|
||||
info!(
|
||||
"Initialized broker client with endpoints: {}",
|
||||
broker_endpoint
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Get a handle to the broker client
|
||||
///
|
||||
pub fn get_broker_client() -> &'static BrokerClientChannel {
|
||||
BROKER_CLIENT.get().expect("broker client not initialized")
|
||||
}
|
||||
|
||||
pub fn is_broker_client_initialized() -> bool {
|
||||
BROKER_CLIENT.get().is_some()
|
||||
}
|
||||
|
||||
/// A handle of an asynchronous task.
|
||||
/// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`]
|
||||
/// and a cancellation token that it can listen to for earlier interrupts.
|
||||
@@ -52,6 +95,7 @@ pub enum TaskEvent<E> {
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum TaskStateUpdate<E> {
|
||||
Init,
|
||||
Started,
|
||||
Progress(E),
|
||||
}
|
||||
@@ -11,12 +11,11 @@
|
||||
|
||||
use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
use crate::broker_client::get_broker_client;
|
||||
use crate::context::RequestContext;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::task_mgr::WALRECEIVER_RUNTIME;
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::{task_mgr, walreceiver::TaskStateUpdate};
|
||||
use anyhow::Context;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
use pageserver_api::models::TimelineState;
|
||||
@@ -29,7 +28,10 @@ use storage_broker::Streaming;
|
||||
use tokio::{select, sync::watch};
|
||||
use tracing::*;
|
||||
|
||||
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
|
||||
use crate::{
|
||||
exponential_backoff, walreceiver::get_broker_client, DEFAULT_BASE_BACKOFF_SECONDS,
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
use utils::{
|
||||
id::{NodeId, TenantTimelineId},
|
||||
@@ -147,7 +149,7 @@ async fn connection_manager_loop_step(
|
||||
let wal_connection = walreceiver_state.wal_connection.as_mut()
|
||||
.expect("Should have a connection, as checked by the corresponding select! guard");
|
||||
match wal_connection_update {
|
||||
TaskEvent::Update(TaskStateUpdate::Started) => {},
|
||||
TaskEvent::Update(TaskStateUpdate::Init | TaskStateUpdate::Started) => {},
|
||||
TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
|
||||
if new_status.has_processed_wal {
|
||||
// We have advanced last_record_lsn by processing the WAL received
|
||||
@@ -22,9 +22,8 @@ use tokio_postgres::{replication::ReplicationStream, Client};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::LIVE_CONNECTIONS_COUNT;
|
||||
use crate::{metrics::LIVE_CONNECTIONS_COUNT, walreceiver::TaskStateUpdate};
|
||||
use crate::{
|
||||
task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
@@ -22,18 +22,16 @@ use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use nix::poll::*;
|
||||
use serde::Serialize;
|
||||
use std::collections::VecDeque;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::prelude::*;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::os::fd::RawFd;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::os::unix::prelude::CommandExt;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
|
||||
use std::sync::{Mutex, MutexGuard};
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::{fs, io};
|
||||
@@ -92,20 +90,6 @@ pub trait WalRedoManager: Send + Sync {
|
||||
) -> Result<Bytes, WalRedoError>;
|
||||
}
|
||||
|
||||
struct ProcessInput {
|
||||
child: NoLeakChild,
|
||||
stdin: ChildStdin,
|
||||
stderr_fd: RawFd,
|
||||
stdout_fd: RawFd,
|
||||
n_requests: usize,
|
||||
}
|
||||
|
||||
struct ProcessOutput {
|
||||
stdout: ChildStdout,
|
||||
pending_responses: VecDeque<Option<Bytes>>,
|
||||
n_processed_responses: usize,
|
||||
}
|
||||
|
||||
///
|
||||
/// This is the real implementation that uses a Postgres process to
|
||||
/// perform WAL replay. Only one thread can use the process at a time,
|
||||
@@ -117,9 +101,7 @@ pub struct PostgresRedoManager {
|
||||
tenant_id: TenantId,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
stdout: Mutex<Option<ProcessOutput>>,
|
||||
stdin: Mutex<Option<ProcessInput>>,
|
||||
stderr: Mutex<Option<ChildStderr>>,
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
}
|
||||
|
||||
/// Can this request be served by neon redo functions
|
||||
@@ -227,17 +209,16 @@ impl PostgresRedoManager {
|
||||
PostgresRedoManager {
|
||||
tenant_id,
|
||||
conf,
|
||||
stdin: Mutex::new(None),
|
||||
stdout: Mutex::new(None),
|
||||
stderr: Mutex::new(None),
|
||||
process: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Launch process pre-emptively. Should not be needed except for benchmarking.
|
||||
pub fn launch_process(&self, pg_version: u32) -> anyhow::Result<()> {
|
||||
let mut proc = self.stdin.lock().unwrap();
|
||||
if proc.is_none() {
|
||||
self.launch(&mut proc, pg_version)?;
|
||||
pub fn launch_process(&mut self, pg_version: u32) -> anyhow::Result<()> {
|
||||
let inner = self.process.get_mut().unwrap();
|
||||
if inner.is_none() {
|
||||
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
|
||||
*inner = Some(p);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -260,19 +241,22 @@ impl PostgresRedoManager {
|
||||
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut proc = self.stdin.lock().unwrap();
|
||||
let mut process_guard = self.process.lock().unwrap();
|
||||
let lock_time = Instant::now();
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if proc.is_none() {
|
||||
self.launch(&mut proc, pg_version)?;
|
||||
if process_guard.is_none() {
|
||||
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
|
||||
*process_guard = Some(p);
|
||||
}
|
||||
let process = process_guard.as_mut().unwrap();
|
||||
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
let result = self
|
||||
.apply_wal_records(proc, buf_tag, base_img, records, wal_redo_timeout)
|
||||
let result = process
|
||||
.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout)
|
||||
.map_err(WalRedoError::IoError);
|
||||
|
||||
let end_time = Instant::now();
|
||||
@@ -311,22 +295,8 @@ impl PostgresRedoManager {
|
||||
base_img_lsn,
|
||||
lsn
|
||||
);
|
||||
// self.stdin only holds stdin & stderr as_raw_fd().
|
||||
// Dropping it as part of take() doesn't close them.
|
||||
// The owning objects (ChildStdout and ChildStderr) are stored in
|
||||
// self.stdout and self.stderr, respsectively.
|
||||
// We intentionally keep them open here to avoid a race between
|
||||
// currently running `apply_wal_records()` and a `launch()` call
|
||||
// after we return here.
|
||||
// The currently running `apply_wal_records()` must not read from
|
||||
// the newly launched process.
|
||||
// By keeping self.stdout and self.stderr open here, `launch()` will
|
||||
// get other file descriptors for the new child's stdout and stderr,
|
||||
// and hence the current `apply_wal_records()` calls will observe
|
||||
// `output.stdout.as_raw_fd() != stdout_fd` .
|
||||
if let Some(proc) = self.stdin.lock().unwrap().take() {
|
||||
proc.child.kill_and_wait();
|
||||
}
|
||||
let process = process_guard.take().unwrap();
|
||||
process.kill();
|
||||
}
|
||||
result
|
||||
}
|
||||
@@ -625,23 +595,32 @@ impl<C: CommandExt> CloseFileDescriptors for C {
|
||||
}
|
||||
}
|
||||
|
||||
impl PostgresRedoManager {
|
||||
///
|
||||
/// Handle to the Postgres WAL redo process
|
||||
///
|
||||
struct PostgresRedoProcess {
|
||||
tenant_id: TenantId,
|
||||
child: NoLeakChild,
|
||||
stdin: ChildStdin,
|
||||
stdout: ChildStdout,
|
||||
stderr: ChildStderr,
|
||||
}
|
||||
|
||||
impl PostgresRedoProcess {
|
||||
//
|
||||
// Start postgres binary in special WAL redo mode.
|
||||
//
|
||||
#[instrument(skip_all,fields(tenant_id=%self.tenant_id, pg_version=pg_version))]
|
||||
#[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
|
||||
fn launch(
|
||||
&self,
|
||||
input: &mut MutexGuard<Option<ProcessInput>>,
|
||||
conf: &PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
pg_version: u32,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<PostgresRedoProcess, Error> {
|
||||
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
||||
// just create one with constant name. That fails if you try to launch more than
|
||||
// one WAL redo manager concurrently.
|
||||
let datadir = path_with_suffix_extension(
|
||||
self.conf
|
||||
.tenant_path(&self.tenant_id)
|
||||
.join("wal-redo-datadir"),
|
||||
conf.tenant_path(&tenant_id).join("wal-redo-datadir"),
|
||||
TEMP_FILE_SUFFIX,
|
||||
);
|
||||
|
||||
@@ -655,12 +634,10 @@ impl PostgresRedoManager {
|
||||
)
|
||||
})?;
|
||||
}
|
||||
let pg_bin_dir_path = self
|
||||
.conf
|
||||
let pg_bin_dir_path = conf
|
||||
.pg_bin_dir(pg_version)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?;
|
||||
let pg_lib_dir_path = self
|
||||
.conf
|
||||
let pg_lib_dir_path = conf
|
||||
.pg_lib_dir(pg_version)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?;
|
||||
|
||||
@@ -746,31 +723,27 @@ impl PostgresRedoManager {
|
||||
// all fallible operations post-spawn are complete, so get rid of the guard
|
||||
let child = scopeguard::ScopeGuard::into_inner(child);
|
||||
|
||||
**input = Some(ProcessInput {
|
||||
Ok(PostgresRedoProcess {
|
||||
tenant_id,
|
||||
child,
|
||||
stdout_fd: stdout.as_raw_fd(),
|
||||
stderr_fd: stderr.as_raw_fd(),
|
||||
stdin,
|
||||
n_requests: 0,
|
||||
});
|
||||
|
||||
*self.stdout.lock().unwrap() = Some(ProcessOutput {
|
||||
stdout,
|
||||
pending_responses: VecDeque::new(),
|
||||
n_processed_responses: 0,
|
||||
});
|
||||
*self.stderr.lock().unwrap() = Some(stderr);
|
||||
|
||||
Ok(())
|
||||
stderr,
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
|
||||
fn kill(self) {
|
||||
self.child.kill_and_wait();
|
||||
}
|
||||
|
||||
//
|
||||
// Apply given WAL records ('records') over an old page image. Returns
|
||||
// new page image.
|
||||
//
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))]
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
|
||||
fn apply_wal_records(
|
||||
&self,
|
||||
mut input: MutexGuard<Option<ProcessInput>>,
|
||||
&mut self,
|
||||
tag: BufferTag,
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
@@ -807,23 +780,33 @@ impl PostgresRedoManager {
|
||||
build_get_page_msg(tag, &mut writebuf);
|
||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||
|
||||
let proc = input.as_mut().unwrap();
|
||||
let mut nwrite = 0usize;
|
||||
let stdout_fd = proc.stdout_fd;
|
||||
// The input is now in 'writebuf'. Do a blind write first, writing as much as
|
||||
// we can, before calling poll(). That skips one call to poll() if the stdin is
|
||||
// already available for writing, which it almost certainly is because the
|
||||
// process is idle.
|
||||
let mut nwrite = self.stdin.write(&writebuf)?;
|
||||
|
||||
// We expect the WAL redo process to respond with an 8k page image. We read it
|
||||
// into this buffer.
|
||||
let mut resultbuf = vec![0; BLCKSZ.into()];
|
||||
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
|
||||
|
||||
// Prepare for calling poll()
|
||||
let mut pollfds = [
|
||||
PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
|
||||
PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
|
||||
PollFd::new(stdout_fd, PollFlags::POLLIN),
|
||||
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
|
||||
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
|
||||
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
|
||||
];
|
||||
|
||||
// We do two things simultaneously: send the old base image and WAL records to
|
||||
// the child process's stdin and forward any logging
|
||||
// We do three things simultaneously: send the old base image and WAL records to
|
||||
// the child process's stdin, read the result from child's stdout, and forward any logging
|
||||
// information that the child writes to its stderr to the page server's log.
|
||||
while nwrite < writebuf.len() {
|
||||
while nresult < BLCKSZ.into() {
|
||||
// If we have more data to write, wake up if 'stdin' becomes writeable or
|
||||
// we have data to read. Otherwise only wake up if there's data to read.
|
||||
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
|
||||
let n = loop {
|
||||
match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
|
||||
match nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32) {
|
||||
Err(e) if e == nix::errno::Errno::EINTR => continue,
|
||||
res => break res,
|
||||
}
|
||||
@@ -837,16 +820,14 @@ impl PostgresRedoManager {
|
||||
let err_revents = pollfds[1].revents().unwrap();
|
||||
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
let mut errbuf: [u8; 16384] = [0; 16384];
|
||||
let mut stderr_guard = self.stderr.lock().unwrap();
|
||||
let stderr = stderr_guard.as_mut().unwrap();
|
||||
let len = stderr.read(&mut errbuf)?;
|
||||
let n = self.stderr.read(&mut errbuf)?;
|
||||
|
||||
// The message might not be split correctly into lines here. But this is
|
||||
// good enough, the important thing is to get the message to the log.
|
||||
if len > 0 {
|
||||
if n > 0 {
|
||||
error!(
|
||||
"wal-redo-postgres: {}",
|
||||
String::from_utf8_lossy(&errbuf[0..len])
|
||||
String::from_utf8_lossy(&errbuf[0..n])
|
||||
);
|
||||
|
||||
// To make sure we capture all log from the process if it fails, keep
|
||||
@@ -860,157 +841,33 @@ impl PostgresRedoManager {
|
||||
));
|
||||
}
|
||||
|
||||
// If 'stdin' is writeable, do write.
|
||||
let in_revents = pollfds[0].revents().unwrap();
|
||||
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
||||
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
|
||||
} else if in_revents.contains(PollFlags::POLLHUP) {
|
||||
// We still have more data to write, but the process closed the pipe.
|
||||
// If we have more data to write and 'stdin' is writeable, do write.
|
||||
if nwrite < writebuf.len() {
|
||||
let in_revents = pollfds[2].revents().unwrap();
|
||||
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
||||
nwrite += self.stdin.write(&writebuf[nwrite..])?;
|
||||
} else if in_revents.contains(PollFlags::POLLHUP) {
|
||||
// We still have more data to write, but the process closed the pipe.
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdin unexpectedly",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// If we have some data in stdout, read it to the result buffer.
|
||||
let out_revents = pollfds[0].revents().unwrap();
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
nresult += self.stdout.read(&mut resultbuf[nresult..])?;
|
||||
} else if out_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdin unexpectedly",
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
}
|
||||
}
|
||||
let request_no = proc.n_requests;
|
||||
proc.n_requests += 1;
|
||||
drop(input);
|
||||
|
||||
// To improve walredo performance we separate sending requests and receiving
|
||||
// responses. Them are protected by different mutexes (output and input).
|
||||
// If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
|
||||
// then there is not warranty that T1 will first granted output mutex lock.
|
||||
// To address this issue we maintain number of sent requests, number of processed
|
||||
// responses and ring buffer with pending responses. After sending response
|
||||
// (under input mutex), threads remembers request number. Then it releases
|
||||
// input mutex, locks output mutex and fetch in ring buffer all responses until
|
||||
// its stored request number. The it takes correspondent element from
|
||||
// pending responses ring buffer and truncate all empty elements from the front,
|
||||
// advancing processed responses number.
|
||||
|
||||
let mut output_guard = self.stdout.lock().unwrap();
|
||||
let output = output_guard.as_mut().unwrap();
|
||||
if output.stdout.as_raw_fd() != stdout_fd {
|
||||
// If stdout file descriptor is changed then it means that walredo process is crashed and restarted.
|
||||
// As far as ProcessInput and ProcessOutout are protected by different mutexes,
|
||||
// it can happen that we send request to one process and waiting response from another.
|
||||
// To prevent such situation we compare stdout file descriptors.
|
||||
// As far as old stdout pipe is destroyed only after new one is created,
|
||||
// it can not reuse the same file descriptor, so this check is safe.
|
||||
//
|
||||
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
|
||||
// That's where we kill the child process.
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
}
|
||||
let n_processed_responses = output.n_processed_responses;
|
||||
while n_processed_responses + output.pending_responses.len() <= request_no {
|
||||
// We expect the WAL redo process to respond with an 8k page image. We read it
|
||||
// into this buffer.
|
||||
let mut resultbuf = vec![0; BLCKSZ.into()];
|
||||
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
|
||||
while nresult < BLCKSZ.into() {
|
||||
// We do two things simultaneously: reading response from stdout
|
||||
// and forward any logging information that the child writes to its stderr to the page server's log.
|
||||
let n = loop {
|
||||
match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
|
||||
Err(e) if e == nix::errno::Errno::EINTR => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
||||
}
|
||||
|
||||
// If we have some messages in stderr, forward them to the log.
|
||||
let err_revents = pollfds[1].revents().unwrap();
|
||||
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
let mut errbuf: [u8; 16384] = [0; 16384];
|
||||
let mut stderr_guard = self.stderr.lock().unwrap();
|
||||
let stderr = stderr_guard.as_mut().unwrap();
|
||||
let len = stderr.read(&mut errbuf)?;
|
||||
|
||||
// The message might not be split correctly into lines here. But this is
|
||||
// good enough, the important thing is to get the message to the log.
|
||||
if len > 0 {
|
||||
error!(
|
||||
"wal-redo-postgres: {}",
|
||||
String::from_utf8_lossy(&errbuf[0..len])
|
||||
);
|
||||
|
||||
// To make sure we capture all log from the process if it fails, keep
|
||||
// reading from the stderr, before checking the stdout.
|
||||
continue;
|
||||
}
|
||||
} else if err_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stderr unexpectedly",
|
||||
));
|
||||
}
|
||||
|
||||
// If we have some data in stdout, read it to the result buffer.
|
||||
let out_revents = pollfds[2].revents().unwrap();
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
|
||||
} else if out_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
}
|
||||
}
|
||||
output
|
||||
.pending_responses
|
||||
.push_back(Some(Bytes::from(resultbuf)));
|
||||
}
|
||||
// Replace our request's response with None in `pending_responses`.
|
||||
// Then make space in the ring buffer by clearing out any seqence of contiguous
|
||||
// `None`'s from the front of `pending_responses`.
|
||||
// NB: We can't pop_front() because other requests' responses because another
|
||||
// requester might have grabbed the output mutex before us:
|
||||
// T1: grab input mutex
|
||||
// T1: send request_no 23
|
||||
// T1: release input mutex
|
||||
// T2: grab input mutex
|
||||
// T2: send request_no 24
|
||||
// T2: release input mutex
|
||||
// T2: grab output mutex
|
||||
// T2: n_processed_responses + output.pending_responses.len() <= request_no
|
||||
// 23 0 24
|
||||
// T2: enters poll loop that reads stdout
|
||||
// T2: put response for 23 into pending_responses
|
||||
// T2: put response for 24 into pending_resposnes
|
||||
// pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
|
||||
// T2: takes its response_24
|
||||
// pending_responses now looks like this: Front Some(response_23) None Back
|
||||
// T2: does the while loop below
|
||||
// pending_responses now looks like this: Front Some(response_23) None Back
|
||||
// T2: releases output mutex
|
||||
// T1: grabs output mutex
|
||||
// T1: n_processed_responses + output.pending_responses.len() > request_no
|
||||
// 23 2 23
|
||||
// T1: skips poll loop that reads stdout
|
||||
// T1: takes its response_23
|
||||
// pending_responses now looks like this: Front None None Back
|
||||
// T2: does the while loop below
|
||||
// pending_responses now looks like this: Front Back
|
||||
// n_processed_responses now has value 25
|
||||
let res = output.pending_responses[request_no - n_processed_responses]
|
||||
.take()
|
||||
.expect("we own this request_no, nobody else is supposed to take it");
|
||||
while let Some(front) = output.pending_responses.front() {
|
||||
if front.is_none() {
|
||||
output.pending_responses.pop_front();
|
||||
output.n_processed_responses += 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(res)
|
||||
Ok(Bytes::from(resultbuf))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ use crate::{
|
||||
use once_cell::sync::Lazy;
|
||||
use std::borrow::Cow;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, warn};
|
||||
use tracing::{info, instrument, warn};
|
||||
|
||||
static CPLANE_WAITERS: Lazy<Waiters<mgmt::ComputeReady>> = Lazy::new(Default::default);
|
||||
|
||||
@@ -143,6 +143,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
&mut self,
|
||||
extra: &ConsoleReqExtra<'_>,
|
||||
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
|
||||
use_cleartext_password_flow: bool,
|
||||
) -> auth::Result<Option<AuthSuccess<NodeInfo>>> {
|
||||
use BackendType::*;
|
||||
|
||||
@@ -190,7 +191,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
|
||||
(node, payload)
|
||||
}
|
||||
Console(endpoint, creds) if creds.use_cleartext_password_flow => {
|
||||
Console(endpoint, creds) if use_cleartext_password_flow => {
|
||||
// This is a hack to allow cleartext password in secure connections (wss).
|
||||
let payload = fetch_plaintext_password(client).await?;
|
||||
let creds = creds.as_ref();
|
||||
@@ -220,16 +221,25 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
}
|
||||
|
||||
/// Authenticate the client via the requested backend, possibly using credentials.
|
||||
///
|
||||
/// If `use_cleartext_password_flow` is true, we use the old cleartext password
|
||||
/// flow. It is used for websocket connections, which want to minimize the number
|
||||
/// of round trips.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn authenticate(
|
||||
mut self,
|
||||
extra: &ConsoleReqExtra<'_>,
|
||||
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
|
||||
use_cleartext_password_flow: bool,
|
||||
) -> auth::Result<AuthSuccess<NodeInfo>> {
|
||||
use BackendType::*;
|
||||
|
||||
// Handle cases when `project` is missing in `creds`.
|
||||
// TODO: type safety: return `creds` with irrefutable `project`.
|
||||
if let Some(res) = self.try_password_hack(extra, client).await? {
|
||||
if let Some(res) = self
|
||||
.try_password_hack(extra, client, use_cleartext_password_flow)
|
||||
.await?
|
||||
{
|
||||
info!("user successfully authenticated (using the password hack)");
|
||||
return Ok(res);
|
||||
}
|
||||
|
||||
@@ -34,9 +34,6 @@ pub struct ClientCredentials<'a> {
|
||||
pub user: &'a str,
|
||||
pub dbname: &'a str,
|
||||
pub project: Option<Cow<'a, str>>,
|
||||
/// If `True`, we'll use the old cleartext password flow. This is used for
|
||||
/// websocket connections, which want to minimize the number of round trips.
|
||||
pub use_cleartext_password_flow: bool,
|
||||
}
|
||||
|
||||
impl ClientCredentials<'_> {
|
||||
@@ -53,7 +50,6 @@ impl<'a> ClientCredentials<'a> {
|
||||
user: self.user,
|
||||
dbname: self.dbname,
|
||||
project: self.project().map(Cow::Borrowed),
|
||||
use_cleartext_password_flow: self.use_cleartext_password_flow,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -63,7 +59,6 @@ impl<'a> ClientCredentials<'a> {
|
||||
params: &'a StartupMessageParams,
|
||||
sni: Option<&str>,
|
||||
common_name: Option<&str>,
|
||||
use_cleartext_password_flow: bool,
|
||||
) -> Result<Self, ClientCredsParseError> {
|
||||
use ClientCredsParseError::*;
|
||||
|
||||
@@ -113,7 +108,6 @@ impl<'a> ClientCredentials<'a> {
|
||||
user = user,
|
||||
dbname = dbname,
|
||||
project = project.as_deref(),
|
||||
use_cleartext_password_flow = use_cleartext_password_flow,
|
||||
"credentials"
|
||||
);
|
||||
|
||||
@@ -121,7 +115,6 @@ impl<'a> ClientCredentials<'a> {
|
||||
user,
|
||||
dbname,
|
||||
project,
|
||||
use_cleartext_password_flow,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -148,7 +141,7 @@ mod tests {
|
||||
let options = StartupMessageParams::new([("user", "john_doe")]);
|
||||
|
||||
// TODO: check that `creds.dbname` is None.
|
||||
let creds = ClientCredentials::parse(&options, None, None, false)?;
|
||||
let creds = ClientCredentials::parse(&options, None, None)?;
|
||||
assert_eq!(creds.user, "john_doe");
|
||||
|
||||
Ok(())
|
||||
@@ -158,7 +151,7 @@ mod tests {
|
||||
fn parse_missing_project() -> anyhow::Result<()> {
|
||||
let options = StartupMessageParams::new([("user", "john_doe"), ("database", "world")]);
|
||||
|
||||
let creds = ClientCredentials::parse(&options, None, None, false)?;
|
||||
let creds = ClientCredentials::parse(&options, None, None)?;
|
||||
assert_eq!(creds.user, "john_doe");
|
||||
assert_eq!(creds.dbname, "world");
|
||||
assert_eq!(creds.project, None);
|
||||
@@ -173,7 +166,7 @@ mod tests {
|
||||
let sni = Some("foo.localhost");
|
||||
let common_name = Some("localhost");
|
||||
|
||||
let creds = ClientCredentials::parse(&options, sni, common_name, false)?;
|
||||
let creds = ClientCredentials::parse(&options, sni, common_name)?;
|
||||
assert_eq!(creds.user, "john_doe");
|
||||
assert_eq!(creds.dbname, "world");
|
||||
assert_eq!(creds.project.as_deref(), Some("foo"));
|
||||
@@ -189,7 +182,7 @@ mod tests {
|
||||
("options", "-ckey=1 project=bar -c geqo=off"),
|
||||
]);
|
||||
|
||||
let creds = ClientCredentials::parse(&options, None, None, false)?;
|
||||
let creds = ClientCredentials::parse(&options, None, None)?;
|
||||
assert_eq!(creds.user, "john_doe");
|
||||
assert_eq!(creds.dbname, "world");
|
||||
assert_eq!(creds.project.as_deref(), Some("bar"));
|
||||
@@ -208,7 +201,7 @@ mod tests {
|
||||
let sni = Some("baz.localhost");
|
||||
let common_name = Some("localhost");
|
||||
|
||||
let creds = ClientCredentials::parse(&options, sni, common_name, false)?;
|
||||
let creds = ClientCredentials::parse(&options, sni, common_name)?;
|
||||
assert_eq!(creds.user, "john_doe");
|
||||
assert_eq!(creds.dbname, "world");
|
||||
assert_eq!(creds.project.as_deref(), Some("baz"));
|
||||
@@ -227,8 +220,7 @@ mod tests {
|
||||
let sni = Some("second.localhost");
|
||||
let common_name = Some("localhost");
|
||||
|
||||
let err =
|
||||
ClientCredentials::parse(&options, sni, common_name, false).expect_err("should fail");
|
||||
let err = ClientCredentials::parse(&options, sni, common_name).expect_err("should fail");
|
||||
match err {
|
||||
InconsistentProjectNames { domain, option } => {
|
||||
assert_eq!(option, "first");
|
||||
@@ -245,8 +237,7 @@ mod tests {
|
||||
let sni = Some("project.localhost");
|
||||
let common_name = Some("example.com");
|
||||
|
||||
let err =
|
||||
ClientCredentials::parse(&options, sni, common_name, false).expect_err("should fail");
|
||||
let err = ClientCredentials::parse(&options, sni, common_name).expect_err("should fail");
|
||||
match err {
|
||||
InconsistentSni { sni, cn } => {
|
||||
assert_eq!(sni, "project.localhost");
|
||||
|
||||
@@ -25,12 +25,11 @@ impl CancelMap {
|
||||
cancel_closure.try_cancel_query().await
|
||||
}
|
||||
|
||||
/// Run async action within an ephemeral session identified by [`CancelKeyData`].
|
||||
pub async fn with_session<'a, F, R, V>(&'a self, f: F) -> anyhow::Result<V>
|
||||
where
|
||||
F: FnOnce(Session<'a>) -> R,
|
||||
R: std::future::Future<Output = anyhow::Result<V>>,
|
||||
{
|
||||
/// Create a new session, with a new client-facing random cancellation key.
|
||||
///
|
||||
/// Use `enable_query_cancellation` to register a database cancellation
|
||||
/// key with it, and to get the client-facing key.
|
||||
pub fn new_session<'a>(&'a self) -> anyhow::Result<Session<'a>> {
|
||||
// HACK: We'd rather get the real backend_pid but tokio_postgres doesn't
|
||||
// expose it and we don't want to do another roundtrip to query
|
||||
// for it. The client will be able to notice that this is not the
|
||||
@@ -44,17 +43,9 @@ impl CancelMap {
|
||||
.lock()
|
||||
.try_insert(key, None)
|
||||
.map_err(|_| anyhow!("query cancellation key already exists: {key}"))?;
|
||||
|
||||
// This will guarantee that the session gets dropped
|
||||
// as soon as the future is finished.
|
||||
scopeguard::defer! {
|
||||
self.0.lock().remove(&key);
|
||||
info!("dropped query cancellation key {key}");
|
||||
}
|
||||
|
||||
info!("registered new query cancellation key {key}");
|
||||
let session = Session::new(key, self);
|
||||
f(session).await
|
||||
|
||||
Ok(Session::new(key, self))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -111,7 +102,7 @@ impl<'a> Session<'a> {
|
||||
impl Session<'_> {
|
||||
/// Store the cancel token for the given session.
|
||||
/// This enables query cancellation in [`crate::proxy::handshake`].
|
||||
pub fn enable_query_cancellation(self, cancel_closure: CancelClosure) -> CancelKeyData {
|
||||
pub fn enable_query_cancellation(&self, cancel_closure: CancelClosure) -> CancelKeyData {
|
||||
info!("enabling query cancellation for this session");
|
||||
self.cancel_map
|
||||
.0
|
||||
@@ -122,6 +113,14 @@ impl Session<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for Session<'a> {
|
||||
fn drop(&mut self) {
|
||||
let key = &self.key;
|
||||
self.cancel_map.0.lock().remove(key);
|
||||
info!("dropped query cancellation key {key}");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -132,14 +131,14 @@ mod tests {
|
||||
static CANCEL_MAP: Lazy<CancelMap> = Lazy::new(Default::default);
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
let task = tokio::spawn(CANCEL_MAP.with_session(|session| async move {
|
||||
|
||||
let session = CANCEL_MAP.new_session()?;
|
||||
let task = tokio::spawn(async move {
|
||||
assert!(CANCEL_MAP.contains(&session));
|
||||
|
||||
tx.send(()).expect("failed to send");
|
||||
futures::future::pending::<()>().await; // sleep forever
|
||||
|
||||
Ok(())
|
||||
}));
|
||||
});
|
||||
|
||||
// Wait until the task has been spawned.
|
||||
rx.await.context("failed to hear from the task")?;
|
||||
|
||||
@@ -14,7 +14,7 @@ use once_cell::sync::Lazy;
|
||||
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{error, info, info_span, Instrument};
|
||||
use tracing::{error, info, info_span, instrument, Instrument};
|
||||
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
const ERR_PROTO_VIOLATION: &str = "protocol violation";
|
||||
@@ -71,17 +71,35 @@ pub async fn task_main(
|
||||
.set_nodelay(true)
|
||||
.context("failed to set socket option")?;
|
||||
|
||||
handle_client(config, &cancel_map, session_id, socket).await
|
||||
handle_postgres_client(config, &cancel_map, session_id, socket).await
|
||||
}
|
||||
.unwrap_or_else(|e| {
|
||||
// Acknowledge that the task has finished with an error.
|
||||
error!("per-client task finished with an error: {e:#}");
|
||||
})
|
||||
.instrument(info_span!("client", session = format_args!("{session_id}"))),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle an incoming PostgreSQL connection
|
||||
pub async fn handle_postgres_client(
|
||||
config: &ProxyConfig,
|
||||
cancel_map: &CancelMap,
|
||||
session_id: uuid::Uuid,
|
||||
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
|
||||
) -> anyhow::Result<()> {
|
||||
handle_client(
|
||||
config,
|
||||
cancel_map,
|
||||
session_id,
|
||||
stream,
|
||||
HostnameMethod::Sni,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Handle an incoming Postgres connection that's wrapped in websocket
|
||||
pub async fn handle_ws_client(
|
||||
config: &ProxyConfig,
|
||||
cancel_map: &CancelMap,
|
||||
@@ -89,45 +107,32 @@ pub async fn handle_ws_client(
|
||||
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
|
||||
hostname: Option<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
// The `closed` counter will increase when this future is destroyed.
|
||||
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.inc();
|
||||
}
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
let hostname = hostname.as_deref();
|
||||
|
||||
// TLS is None here, because the connection is already encrypted.
|
||||
let do_handshake = handshake(stream, None, cancel_map).instrument(info_span!("handshake"));
|
||||
let (mut stream, params) = match do_handshake.await? {
|
||||
Some(x) => x,
|
||||
None => return Ok(()), // it's a cancellation request
|
||||
};
|
||||
|
||||
// Extract credentials which we're going to use for auth.
|
||||
let creds = {
|
||||
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
|
||||
let result = config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|_| auth::ClientCredentials::parse(¶ms, hostname, common_name, true))
|
||||
.transpose();
|
||||
|
||||
async { result }.or_else(|e| stream.throw_error(e)).await?
|
||||
};
|
||||
|
||||
let client = Client::new(stream, creds, ¶ms, session_id);
|
||||
cancel_map
|
||||
.with_session(|session| client.connect_to_db(session))
|
||||
.await
|
||||
handle_client(
|
||||
config,
|
||||
cancel_map,
|
||||
session_id,
|
||||
stream,
|
||||
HostnameMethod::Param(hostname),
|
||||
true,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
enum HostnameMethod {
|
||||
Param(Option<String>),
|
||||
Sni,
|
||||
}
|
||||
|
||||
/// Handle an incoming client connection, handshake and authentication.
|
||||
/// After that, keeps forwarding all the data. This doesn't return until the
|
||||
/// connection is lost.
|
||||
async fn handle_client(
|
||||
config: &ProxyConfig,
|
||||
cancel_map: &CancelMap,
|
||||
session_id: uuid::Uuid,
|
||||
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
|
||||
raw_stream: impl AsyncRead + AsyncWrite + Unpin + Send,
|
||||
hostname_method: HostnameMethod,
|
||||
use_cleartext_password_flow: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
// The `closed` counter will increase when this future is destroyed.
|
||||
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
|
||||
@@ -135,36 +140,73 @@ async fn handle_client(
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.inc();
|
||||
}
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
let do_handshake = handshake(stream, tls, cancel_map).instrument(info_span!("handshake"));
|
||||
let (mut stream, params) = match do_handshake.await? {
|
||||
Some(x) => x,
|
||||
None => return Ok(()), // it's a cancellation request
|
||||
};
|
||||
// Accept the connection from the client, authenticate it, and establish
|
||||
// connection to the database.
|
||||
//
|
||||
// We cover all these activities in a one tracing span, so that they are
|
||||
// traced as one request. That makes it convenient to investigate where
|
||||
// the time is spent when establishing a new connection. After the
|
||||
// connection has been established, we exit the span, and use a separate
|
||||
// span for the (rest of the) duration of the connection.
|
||||
let conn = async {
|
||||
// Process postgres startup packet and upgrade to TLS (if applicable)
|
||||
let tls = config.tls_config.as_ref();
|
||||
let (mut stream, params) = match handshake(raw_stream, tls, cancel_map).await? {
|
||||
Some(x) => x,
|
||||
None => return Ok::<_, anyhow::Error>(None), // it's a cancellation request
|
||||
};
|
||||
|
||||
// Extract credentials which we're going to use for auth.
|
||||
let creds = {
|
||||
let sni = stream.get_ref().sni_hostname();
|
||||
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
|
||||
let result = config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|_| auth::ClientCredentials::parse(¶ms, sni, common_name, false))
|
||||
.transpose();
|
||||
// Extract credentials which we're going to use for auth.
|
||||
let creds = {
|
||||
let sni = match &hostname_method {
|
||||
HostnameMethod::Param(hostname) => hostname.as_deref(),
|
||||
HostnameMethod::Sni => stream.get_ref().sni_hostname(),
|
||||
};
|
||||
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
|
||||
let result = config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|_| auth::ClientCredentials::parse(¶ms, sni, common_name))
|
||||
.transpose();
|
||||
|
||||
async { result }.or_else(|e| stream.throw_error(e)).await?
|
||||
};
|
||||
async { result }.or_else(|e| stream.throw_error(e)).await?
|
||||
};
|
||||
|
||||
let client = Client::new(stream, creds, ¶ms, session_id);
|
||||
cancel_map
|
||||
.with_session(|session| client.connect_to_db(session))
|
||||
.await
|
||||
Ok(Some(
|
||||
EstablishedConnection::connect_to_db(
|
||||
stream,
|
||||
creds,
|
||||
¶ms,
|
||||
session_id,
|
||||
use_cleartext_password_flow,
|
||||
cancel_map,
|
||||
)
|
||||
.await?,
|
||||
))
|
||||
}
|
||||
.instrument(info_span!("establish_connection", session_id=%session_id))
|
||||
.await?;
|
||||
|
||||
match conn {
|
||||
Some(conn) => {
|
||||
// Connection established in both ways. Forward all traffic until the
|
||||
// either connection is lost.
|
||||
conn.handle_connection()
|
||||
.instrument(info_span!("forward", session_id=%session_id))
|
||||
.await
|
||||
}
|
||||
None => {
|
||||
// It was a cancellation request. It was handled in 'handshake' already.
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Establish a (most probably, secure) connection with the client.
|
||||
/// For better testing experience, `stream` can be any object satisfying the traits.
|
||||
/// It's easier to work with owned `stream` here as we need to upgrade it to TLS;
|
||||
/// we also take an extra care of propagating only the select handshake errors to client.
|
||||
#[instrument(skip_all)]
|
||||
async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
stream: S,
|
||||
mut tls: Option<&TlsConfig>,
|
||||
@@ -227,43 +269,36 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
|
||||
/// Thin connection context.
|
||||
struct Client<'a, S> {
|
||||
/// The underlying libpq protocol stream.
|
||||
stream: PqStream<S>,
|
||||
/// Client credentials that we care about.
|
||||
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
|
||||
/// KV-dictionary with PostgreSQL connection params.
|
||||
params: &'a StartupMessageParams,
|
||||
/// Unique connection ID.
|
||||
session_id: uuid::Uuid,
|
||||
struct EstablishedConnection<'a, S> {
|
||||
client_stream: MeasuredStream<S>,
|
||||
db_stream: MeasuredStream<tokio::net::TcpStream>,
|
||||
|
||||
/// Hold on to the Session for as long as the connection is alive, so that
|
||||
/// it can be cancelled.
|
||||
_session: cancellation::Session<'a>,
|
||||
}
|
||||
|
||||
impl<'a, S> Client<'a, S> {
|
||||
/// Construct a new connection context.
|
||||
fn new(
|
||||
stream: PqStream<S>,
|
||||
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
|
||||
params: &'a StartupMessageParams,
|
||||
session_id: uuid::Uuid,
|
||||
) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
creds,
|
||||
params,
|
||||
session_id,
|
||||
}
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin + Send> EstablishedConnection<'_, S> {
|
||||
async fn handle_connection(mut self) -> anyhow::Result<()> {
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
let _ = tokio::io::copy_bidirectional(&mut self.client_stream, &mut self.db_stream).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
|
||||
/// Let the client authenticate and connect to the designated compute node.
|
||||
async fn connect_to_db(self, session: cancellation::Session<'_>) -> anyhow::Result<()> {
|
||||
let Self {
|
||||
mut stream,
|
||||
creds,
|
||||
params,
|
||||
session_id,
|
||||
} = self;
|
||||
/// On return, the connection is fully established in both ways, and we can start
|
||||
/// forwarding the bytes.
|
||||
#[instrument(skip_all)]
|
||||
async fn connect_to_db<'a>(
|
||||
mut stream: PqStream<S>,
|
||||
creds: auth::BackendType<'a, auth::ClientCredentials<'_>>,
|
||||
params: &'_ StartupMessageParams,
|
||||
session_id: uuid::Uuid,
|
||||
use_cleartext_password_flow: bool,
|
||||
cancel_map: &'a CancelMap,
|
||||
) -> anyhow::Result<EstablishedConnection<'a, S>> {
|
||||
let session = cancel_map.new_session()?;
|
||||
|
||||
let extra = auth::ConsoleReqExtra {
|
||||
session_id, // aka this connection's id
|
||||
@@ -272,10 +307,11 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
|
||||
|
||||
let auth_result = async {
|
||||
// `&mut stream` doesn't let us merge those 2 lines.
|
||||
let res = creds.authenticate(&extra, &mut stream).await;
|
||||
let res = creds
|
||||
.authenticate(&extra, &mut stream, use_cleartext_password_flow)
|
||||
.await;
|
||||
async { res }.or_else(|e| stream.throw_error(e)).await
|
||||
}
|
||||
.instrument(info_span!("auth"))
|
||||
.await?;
|
||||
|
||||
let node = auth_result.value;
|
||||
@@ -311,21 +347,15 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
|
||||
.await?;
|
||||
|
||||
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("tx"));
|
||||
let mut client = MeasuredStream::new(stream.into_inner(), |cnt| {
|
||||
// Number of bytes we sent to the client (outbound).
|
||||
m_sent.inc_by(cnt as u64);
|
||||
});
|
||||
let client_stream = MeasuredStream::new(stream.into_inner(), m_sent);
|
||||
|
||||
let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("rx"));
|
||||
let mut db = MeasuredStream::new(db.stream, |cnt| {
|
||||
// Number of bytes the client sent to the compute node (inbound).
|
||||
m_recv.inc_by(cnt as u64);
|
||||
});
|
||||
let db_stream = MeasuredStream::new(db.stream, m_recv);
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;
|
||||
|
||||
Ok(())
|
||||
Ok(EstablishedConnection {
|
||||
client_stream,
|
||||
db_stream,
|
||||
_session: session,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use std::{io, task};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
|
||||
use tokio_rustls::server::TlsStream;
|
||||
use tracing::instrument;
|
||||
|
||||
pin_project! {
|
||||
/// Stream wrapper which implements libpq's protocol.
|
||||
@@ -105,6 +106,7 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
|
||||
/// Write the error message using [`Self::write_message`], then re-throw it.
|
||||
/// Allowing string literals is safe under the assumption they might not contain any runtime info.
|
||||
/// This method exists due to `&str` not implementing `Into<anyhow::Error>`.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn throw_error_str<T>(&mut self, error: &'static str) -> anyhow::Result<T> {
|
||||
tracing::info!("forwarding error to user: {error}");
|
||||
self.write_message(&BeMessage::ErrorResponse(error, None))
|
||||
@@ -114,6 +116,7 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
|
||||
|
||||
/// Write the error message using [`Self::write_message`], then re-throw it.
|
||||
/// Trait [`UserFacingError`] acts as an allowlist for error types.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn throw_error<T, E>(&mut self, error: E) -> anyhow::Result<T>
|
||||
where
|
||||
E: UserFacingError + Into<anyhow::Error>,
|
||||
@@ -228,27 +231,27 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for Stream<S> {
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// This stream tracks all writes and calls user provided
|
||||
/// callback when the underlying stream is flushed.
|
||||
pub struct MeasuredStream<S, W> {
|
||||
/// This stream tracks all writes, and whenever the stream is flushed,
|
||||
/// increments the user-provided counter by the number of bytes flushed.
|
||||
pub struct MeasuredStream<S> {
|
||||
#[pin]
|
||||
stream: S,
|
||||
write_count: usize,
|
||||
inc_write_count: W,
|
||||
write_counter: prometheus::IntCounter,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, W> MeasuredStream<S, W> {
|
||||
pub fn new(stream: S, inc_write_count: W) -> Self {
|
||||
impl<S> MeasuredStream<S> {
|
||||
pub fn new(stream: S, write_counter: prometheus::IntCounter) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
write_count: 0,
|
||||
inc_write_count,
|
||||
write_counter,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + Unpin, W> AsyncRead for MeasuredStream<S, W> {
|
||||
impl<S: AsyncRead + Unpin> AsyncRead for MeasuredStream<S> {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
context: &mut task::Context<'_>,
|
||||
@@ -258,7 +261,7 @@ impl<S: AsyncRead + Unpin, W> AsyncRead for MeasuredStream<S, W> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncWrite + Unpin, W: FnMut(usize)> AsyncWrite for MeasuredStream<S, W> {
|
||||
impl<S: AsyncWrite + Unpin> AsyncWrite for MeasuredStream<S> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
context: &mut task::Context<'_>,
|
||||
@@ -279,7 +282,7 @@ impl<S: AsyncWrite + Unpin, W: FnMut(usize)> AsyncWrite for MeasuredStream<S, W>
|
||||
let this = self.project();
|
||||
this.stream.poll_flush(context).map_ok(|()| {
|
||||
// Call the user provided callback and reset the write count.
|
||||
(this.inc_write_count)(*this.write_count);
|
||||
this.write_counter.inc_by(*this.write_count as u64);
|
||||
*this.write_count = 0;
|
||||
})
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
".*is not active. Current state: Broken.*",
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*could not load tenant.*load timeline.*",
|
||||
".*could not load tenant.*load local timeline.*",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -2,15 +2,7 @@ from contextlib import closing
|
||||
|
||||
import psycopg2.extras
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
LocalFsStorage,
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
assert_tenant_status,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
def test_tenant_config(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -166,46 +158,3 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
"pitr_interval": 60,
|
||||
}.items()
|
||||
)
|
||||
|
||||
|
||||
def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
|
||||
test_name="test_creating_tenant_conf_after_attach",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
|
||||
# tenant is created with defaults, as in without config file
|
||||
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
|
||||
config_path = env.repo_dir / "tenants" / str(tenant_id) / "config"
|
||||
assert config_path.exists(), "config file is always initially created"
|
||||
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
detail = http_client.timeline_detail(tenant_id, timeline_id)
|
||||
last_record_lsn = Lsn(detail["last_record_lsn"])
|
||||
assert last_record_lsn.lsn_int != 0, "initdb must have executed"
|
||||
|
||||
wait_for_upload(http_client, tenant_id, timeline_id, last_record_lsn)
|
||||
|
||||
http_client.tenant_detach(tenant_id)
|
||||
|
||||
assert not config_path.exists(), "detach did not remove config file"
|
||||
|
||||
http_client.tenant_attach(tenant_id)
|
||||
wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: assert_tenant_status(http_client, tenant_id, "Active"),
|
||||
)
|
||||
|
||||
env.neon_cli.config_tenant(tenant_id, {"gc_horizon": "1000000"})
|
||||
contents_first = config_path.read_text()
|
||||
env.neon_cli.config_tenant(tenant_id, {"gc_horizon": "0"})
|
||||
contents_later = config_path.read_text()
|
||||
|
||||
# dont test applying the setting here, we have that another test case to show it
|
||||
# we just care about being able to create the file
|
||||
assert len(contents_first) > len(contents_later)
|
||||
|
||||
@@ -579,11 +579,12 @@ def test_load_attach_negatives(
|
||||
|
||||
pageserver_http.tenant_ignore(tenant_id)
|
||||
|
||||
expect_error_msg = f".*attach tenant {tenant_id}: some local filesystem state already exists:"
|
||||
env.pageserver.allowed_errors.append(expect_error_msg)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Cannot attach tenant .*?, local tenant directory already exists.*"
|
||||
)
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
match=expect_error_msg,
|
||||
match=f"Cannot attach tenant {tenant_id}, local tenant directory already exists",
|
||||
):
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
|
||||
@@ -627,11 +628,12 @@ def test_ignore_while_attaching(
|
||||
pageserver_http.tenant_ignore(tenant_id)
|
||||
|
||||
# Cannot attach it due to some local files existing
|
||||
expect_error_msg = f".*attach tenant {tenant_id}: some local filesystem state already exists:"
|
||||
env.pageserver.allowed_errors.append(expect_error_msg)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Cannot attach tenant .*?, local tenant directory already exists.*"
|
||||
)
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
match=expect_error_msg,
|
||||
match=f"Cannot attach tenant {tenant_id}, local tenant directory already exists",
|
||||
):
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user