Compare commits

..

5 Commits

Author SHA1 Message Date
Heikki Linnakangas
6fcf0f2754 Refactor common parts of handle_client and handle_ws_client to function.
There was a lot of duplicated code.

The resulting shared function now uses two tracing spans, one for
establishing the connections, and a separate span for forwarding the
traffic after that. This makes for nicer traces in the future, because
you can dig into how long the startup phase takes, and where the time
is spent.
2023-01-26 15:21:24 +02:00
Heikki Linnakangas
d336b8b5d9 Refactor Client into EstablishedConnection.
The name "Client" was a bit ambiguous. Instead of encapsulating all
the data needed to establish the connection, change it so that it
encapsulates the streams, after the connection has been established.
With that, "EstablishedConnection" is a fitting name for it.
2023-01-26 15:21:24 +02:00
Heikki Linnakangas
4d68e3108f Refactor use_cleartext_password_flow.
It's not a property of the credentials that we receive from the
client, so remove it from ClientCredentials. Instead, pass it as an
argument directly to 'authenticate' function, where it's actually
used. All the rest of the changes is just plumbing to pass it through
the call stack to 'authenticate'
2023-01-26 15:21:24 +02:00
Heikki Linnakangas
3e150419ef Add a few tracing spans, for more fine-grained tracing.
This also splits the 'connect_to_db' function, so that it only
establishes the connection, and a new 'handle_connection' function is
the equivalent of what 'connect_to_db' used to do. This made it easier
to attach a span to specifically to the first part where we establish
the connection.
2023-01-26 15:21:24 +02:00
Heikki Linnakangas
9e424d2f84 Simplify MeasuredStream a little.
It was generalized so that you could pass a custom function that is
called whenever data is flushed. The only use case we have for it was
to increment a prometheus counter, so let's dismantle the abstraction,
and just pass a prometheus IntCounter to it. Simplifies the code a
little bit. If we need the abstraction again in the future, we can
always add it back.
2023-01-26 15:21:24 +02:00
24 changed files with 792 additions and 1360 deletions

View File

@@ -1,39 +1,37 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://console-release.local/management/api/v2"
domain: "*.cloud.neon.tech"
authBackend: "link"
authEndpoint: "https://console.neon.tech/authenticate_proxy_request/"
uri: "https://console.neon.tech/psql_session/"
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://console-release.local/billing/api/v1/usage_events"
metricCollectionInterval: "10min"
# -- Additional labels for neon-proxy pods
# -- Additional labels for zenith-proxy pods
podLabels:
zenith_service: proxy-scram
zenith_env: prod
zenith_service: proxy
zenith_env: production
zenith_region: us-west-2
zenith_region_slug: us-west-2
zenith_region_slug: oregon
service:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal
external-dns.alpha.kubernetes.io/hostname: proxy-release.local
type: LoadBalancer
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: neon-proxy-scram-legacy.eta.us-west-2.aws.neon.tech
httpsPort: 443
external-dns.alpha.kubernetes.io/hostname: connect.neon.tech,pg.neon.tech
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
metrics:
enabled: true
serviceMonitor:
enabled: true
selector:
release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1

View File

@@ -690,37 +690,20 @@ jobs:
promote-images:
runs-on: [ self-hosted, gen3, small ]
needs: [ tag, test-images, vm-compute-node-image ]
container: golang:1.19-bullseye
if: github.event_name != 'workflow_dispatch'
container: amazon/aws-cli
strategy:
fail-fast: false
matrix:
name: [ neon, compute-node-v14, vm-compute-node-v14, compute-node-v15, vm-compute-node-v15, compute-tools]
env:
AWS_DEFAULT_REGION: eu-central-1
steps:
- name: Install Crane & ECR helper
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
- name: Promote image to latest
run: |
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
- name: Configure ECR login
run: |
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Add latest tag to images
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
export MANIFEST=$(aws ecr batch-get-image --repository-name ${{ matrix.name }} --image-ids imageTag=${{needs.tag.outputs.build-tag}} --query 'images[].imageManifest' --output text)
aws ecr put-image --repository-name ${{ matrix.name }} --image-tag latest --image-manifest "$MANIFEST"
push-docker-hub:
runs-on: [ self-hosted, dev, x64 ]
@@ -879,15 +862,9 @@ jobs:
ANSIBLE_CONFIG=./ansible.cfg ansible-playbook deploy.yaml -i ${{ matrix.env_name }}.hosts.yaml -e CONSOLE_API_TOKEN=${{ secrets[matrix.console_api_key_secret] }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ansible/collections': Permission denied
- name: Cleanup ansible folder
run: rm -rf ~/.ansible
deploy-new:
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
options: --user root --privileged
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
needs: [ push-docker-hub, tag, regress-tests ]
@@ -925,9 +902,6 @@ jobs:
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
- name: Cleanup ansible folder
run: rm -rf ~/.ansible
deploy-pr-test-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
@@ -961,9 +935,6 @@ jobs:
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
- name: Cleanup ansible folder
run: rm -rf ~/.ansible
deploy-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
@@ -1008,7 +979,7 @@ jobs:
deploy-proxy:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
if: |
@@ -1031,26 +1002,29 @@ jobs:
submodules: true
fetch-depth: 0
- name: Add curl
run: apt update && apt install curl -y
- name: Store kubeconfig file
run: |
echo "${{ secrets[matrix.kubeconfig_secret] }}" | base64 --decode > ${KUBECONFIG}
chmod 0600 ${KUBECONFIG}
- name: Add neon helm chart
run: helm repo add neondatabase https://neondatabase.github.io/helm-charts
- name: Setup helm v3
run: |
curl -s https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
helm repo add neondatabase https://neondatabase.github.io/helm-charts
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install --atomic -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install --atomic -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-storage-broker:
name: deploy storage broker on old staging and old prod
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
if: |
@@ -1073,21 +1047,23 @@ jobs:
submodules: true
fetch-depth: 0
- name: Add curl
run: apt update && apt install curl -y
- name: Store kubeconfig file
run: |
echo "${{ secrets[matrix.kubeconfig_secret] }}" | base64 --decode > ${KUBECONFIG}
chmod 0600 ${KUBECONFIG}
- name: Add neon helm chart
run: helm repo add neondatabase https://neondatabase.github.io/helm-charts
- name: Setup helm v3
run: |
curl -s https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
helm repo add neondatabase https://neondatabase.github.io/helm-charts
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker neondatabase/neon-storage-broker --namespace ${{ matrix.storage_broker_ns }} --create-namespace --install --atomic -f .github/helm-values/${{ matrix.storage_broker_config }}.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-proxy-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
@@ -1119,14 +1095,6 @@ jobs:
submodules: true
fetch-depth: 0
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
@@ -1149,9 +1117,6 @@ jobs:
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-scram-legacy neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram-legacy.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-storage-broker-dev-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
@@ -1179,14 +1144,6 @@ jobs:
submodules: true
fetch-depth: 0
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
@@ -1196,9 +1153,6 @@ jobs:
run:
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-proxy-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
@@ -1216,19 +1170,15 @@ jobs:
- target_region: us-east-2
target_cluster: prod-us-east-2-delta
deploy_link_proxy: true
deploy_legacy_scram_proxy: false
- target_region: us-west-2
target_cluster: prod-us-west-2-eta
deploy_link_proxy: false
deploy_legacy_scram_proxy: true
- target_region: eu-central-1
target_cluster: prod-eu-central-1-gamma
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
- target_region: ap-southeast-1
target_cluster: prod-ap-southeast-1-epsilon
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
environment:
name: prod-${{ matrix.target_region }}
steps:
@@ -1254,12 +1204,6 @@ jobs:
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-link neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-link.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy legacy scram proxy
if: matrix.deploy_legacy_scram_proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-scram-legacy neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram-legacy.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
deploy-storage-broker-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest

View File

@@ -250,7 +250,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
let signals = signals::install_shutdown_handlers()?;
// Launch broker client
WALRECEIVER_RUNTIME.block_on(pageserver::broker_client::init_broker_client(conf))?;
WALRECEIVER_RUNTIME.block_on(pageserver::walreceiver::init_broker_client(conf))?;
// Initialize authentication for incoming connections
let auth = match &conf.auth_type {

View File

@@ -1,48 +0,0 @@
//! The broker client instance of the pageserver, created during pageserver startup.
//! Used by each timelines' [`walreceiver`].
use crate::config::PageServerConf;
use anyhow::Context;
use once_cell::sync::OnceCell;
use storage_broker::BrokerClientChannel;
use tracing::*;
static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
///
/// Initialize the broker client. This must be called once at page server startup.
///
pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
let broker_endpoint = conf.broker_endpoint.clone();
// Note: we do not attempt connecting here (but validate endpoints sanity).
let broker_client =
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
format!(
"Failed to create broker client to {}",
&conf.broker_endpoint
),
)?;
if BROKER_CLIENT.set(broker_client).is_err() {
panic!("broker already initialized");
}
info!(
"Initialized broker client with endpoints: {}",
broker_endpoint
);
Ok(())
}
///
/// Get a handle to the broker client
///
pub fn get_broker_client() -> &'static BrokerClientChannel {
BROKER_CLIENT.get().expect("broker client not initialized")
}
pub fn is_broker_client_initialized() -> bool {
BROKER_CLIENT.get().is_some()
}

View File

@@ -29,7 +29,7 @@ use utils::{
use crate::tenant::config::TenantConf;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{TENANT_ATTACHING_MARKER_SUFFIX, TIMELINES_SEGMENT_NAME};
use crate::tenant::{TENANT_ATTACHING_MARKER_FILENAME, TIMELINES_SEGMENT_NAME};
use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_UNINIT_MARK_SUFFIX,
};
@@ -459,7 +459,8 @@ impl PageServerConf {
}
pub fn tenant_attaching_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf {
path_with_suffix_extension(self.tenant_path(tenant_id), TENANT_ATTACHING_MARKER_SUFFIX)
self.tenant_path(tenant_id)
.join(TENANT_ATTACHING_MARKER_FILENAME)
}
pub fn tenant_ignore_mark_file_path(&self, tenant_id: TenantId) -> PathBuf {

View File

@@ -1,6 +1,5 @@
mod auth;
pub mod basebackup;
pub mod broker_client;
pub mod config;
pub mod consumption_metrics;
pub mod context;
@@ -17,6 +16,7 @@ pub mod tenant;
pub mod trace;
pub mod virtual_file;
pub mod walingest;
pub mod walreceiver;
pub mod walrecord;
pub mod walredo;

View File

@@ -1653,7 +1653,7 @@ mod tests {
assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
// Create a branch, check that the relation is visible there
repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x30))?;
let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
Some(timeline) => timeline,
None => panic!("Should have a local timeline"),

View File

@@ -238,17 +238,11 @@ pub enum TaskKind {
// Task that downloads a file from remote storage
RemoteDownloadTask,
// task that handles loading of a tenant during pageserver startup
TenantLoadStartup,
// task that handles loading of a tenant in response to a /load HTTP API request
TenantLoadApi,
// task that handles loading of a tenant as part of the tenant creation procedure
TenantLoadCreate,
// task that handles the initial downloading of all tenants
InitialLoad,
// task that handles attaching a tenant
TenantAttach,
Attach,
// task that handhes metrics collection
MetricsCollection,

File diff suppressed because it is too large Load Diff

View File

@@ -19,13 +19,12 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{Tenant, TenantState, TENANT_ATTACHING_LEGACY_MARKER_FILENAME};
use crate::tenant::{Tenant, TenantState};
use crate::IGNORED_TENANT_FILE_NAME;
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
use super::{TenantLoadReasonNotAttach, TENANT_ATTACHING_MARKER_SUFFIX};
/// The tenants known to the pageserver.
/// The enum variants are used to distinguish the different states that the pageserver can be in.
enum TenantsMap {
@@ -67,11 +66,6 @@ pub async fn init_tenant_mgr(
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
// Other code in pageserver assumes new attaching markers.
// Do the migration here, abort startup if it fails.
Tenant::migrate_attaching_marker_files(&conf.tenants_path())
.context("attaching marker migration failed")?;
let mut tenants = HashMap::new();
let mut dir_entries = fs::read_dir(&tenants_dir)
@@ -98,12 +92,18 @@ pub async fn init_tenant_mgr(
);
}
} else {
if tenant_dir_path
.to_string_lossy()
.ends_with(TENANT_ATTACHING_MARKER_SUFFIX)
{
// schedule_local_tenant_processing checks for marker when it encounters a tenant dir
info!("found a tenant attaching marker {tenant_dir_path:?}, skipping");
// This case happens if we crash during attach before creating the attach marker file
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
})?;
if is_empty {
info!("removing empty tenant directory {tenant_dir_path:?}");
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
error!(
"Failed to remove empty tenant directory '{}': {e:#}",
tenant_dir_path.display()
)
}
continue;
}
@@ -117,7 +117,6 @@ pub async fn init_tenant_mgr(
conf,
&tenant_dir_path,
remote_storage.clone(),
TenantLoadReasonNotAttach::PageserverStartup,
&ctx,
) {
Ok(tenant) => {
@@ -148,11 +147,10 @@ pub async fn init_tenant_mgr(
Ok(())
}
pub(crate) fn schedule_local_tenant_processing(
pub fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_path: &Path,
remote_storage: Option<GenericRemoteStorage>,
load_reason: TenantLoadReasonNotAttach,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
@@ -164,10 +162,10 @@ pub(crate) fn schedule_local_tenant_processing(
"Cannot load tenant from temporary path {tenant_path:?}"
);
anyhow::ensure!(
!tenant_path
.to_string_lossy()
.ends_with(TENANT_ATTACHING_MARKER_SUFFIX),
"Caller must filter these out: {tenant_path:?}"
!tenant_path.is_empty_dir().with_context(|| {
format!("Failed to check whether {tenant_path:?} is an empty dir")
})?,
"Cannot load tenant from empty directory {tenant_path:?}"
);
let tenant_id = tenant_path
@@ -185,22 +183,10 @@ pub(crate) fn schedule_local_tenant_processing(
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
);
let legacy_attaching_marker = tenant_path.join(TENANT_ATTACHING_LEGACY_MARKER_FILENAME);
anyhow::ensure!(
!legacy_attaching_marker.exists(),
"legacy attaching marker still present, migration code must have been not called or has a bug: {legacy_attaching_marker:?}"
);
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
if let Some(remote_storage) = remote_storage {
match Tenant::spawn_resume_attach(conf, tenant_id, remote_storage, ctx) {
Ok(tenant) => tenant,
Err(e) => {
warn!("tenant {tenant_id} failed to resume attach operation: {e:#}");
Tenant::create_broken_tenant(conf, tenant_id)
}
}
Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx)
} else {
warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured");
Tenant::create_broken_tenant(conf, tenant_id)
@@ -208,7 +194,7 @@ pub(crate) fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(conf, tenant_id, remote_storage, load_reason, ctx)
Tenant::spawn_load(conf, tenant_id, remote_storage, ctx)
};
Ok(tenant)
}
@@ -288,7 +274,7 @@ pub async fn create_tenant(
// and do the work in that state.
let tenant_directory = super::create_tenant_files(conf, tenant_conf, tenant_id)?;
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, TenantLoadReasonNotAttach::Create, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, ctx)?;
let crated_tenant_id = created_tenant.tenant_id();
anyhow::ensure!(
tenant_id == crated_tenant_id,
@@ -305,11 +291,10 @@ pub async fn update_tenant_config(
tenant_id: TenantId,
) -> anyhow::Result<()> {
info!("configuring tenant {tenant_id}");
let tenant = get_tenant(tenant_id, true).await?;
tenant.update_tenant_config(tenant_conf);
let tenant_config_path = conf.tenant_config_path(tenant_id);
Tenant::persist_tenant_config(&tenant.tenant_id(), &tenant_config_path, tenant_conf, false)?;
get_tenant(tenant_id, true)
.await?
.update_tenant_config(tenant_conf);
Tenant::persist_tenant_config(&conf.tenant_config_path(tenant_id), tenant_conf, false)?;
Ok(())
}
@@ -375,7 +360,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, remote_storage, TenantLoadReasonNotAttach::LoadApi, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, remote_storage, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -435,8 +420,13 @@ pub async fn attach_tenant(
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, |vacant_entry| {
let tenant = Tenant::spawn_start_attach(conf, tenant_id, remote_storage, ctx)
.map_err(|source| anyhow::anyhow!("attach tenant {tenant_id}: {source:#}"))?;
let tenant_path = conf.tenant_path(&tenant_id);
anyhow::ensure!(
!tenant_path.exists(),
"Cannot attach tenant {tenant_id}, local tenant directory already exists"
);
let tenant = Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx);
vacant_entry.insert(tenant);
Ok(())
})

View File

@@ -157,26 +157,17 @@
//! downloading files from the remote storage. Downloads are performed immediately
//! against the `RemoteStorage`, independently of the upload queue.
//!
//! When we attach a tenant, we prepare the on-disk state based on the remote state,
//! then use the same code that's used to set up the tenant during pageserver startup:
//!
//! When we attach a tenant, we perform the following steps:
//! - create `Tenant` object in `TenantState::Attaching` state
//! - Create an attaching marker file for this tenant on disk.
//! - List timelines that are present in remote storage, and for each:
//! - download their remote [`IndexPart`]s
//! - create the local `metadata` file from the [`IndexPart`] contents
//! - Remove the attaching marker file.
//! - tell the `Tenant` object to load the prepared on-disk state.
//!
//! Loading the on-disk state performs the following steps:
//!
//! - create `Timeline` struct and a `RemoteTimelineClient`
//! - initialize the client's upload queue with the `IndexPart`
//! - for attach, we carry this over in memory
//! - during pageserver startup, we refresh the IndexParts from the remote
//! - initialize the client's upload queue with its `IndexPart`
//! - create [`RemoteLayer`] instances for layers that are referenced by `IndexPart`
//! but not present locally
//! - schedule uploads for layers that are only present locally.
//! - if the remote `IndexPart`'s metadata was newer than the metadata in
//! the local filesystem, write the remote metadata to the local filesystem
//! - After the above is done for each timeline, open the tenant for business by
//! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
//! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.

View File

@@ -1,7 +1,5 @@
//!
mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context};
use bytes::Bytes;
use fail::fail_point;
@@ -25,7 +23,6 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::broker_client::is_broker_client_initialized;
use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
@@ -62,11 +59,11 @@ use crate::page_cache;
use crate::repository::GcResult;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walreceiver::{is_broker_client_initialized, spawn_connection_manager_task};
use crate::walredo::WalRedoManager;
use crate::METADATA_FILE_NAME;
use crate::ZERO_PAGE;
use crate::{is_temporary, task_mgr};
use walreceiver::spawn_connection_manager_task;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;

View File

@@ -23,15 +23,58 @@
mod connection_manager;
mod walreceiver_connection;
use crate::config::PageServerConf;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use anyhow::Context;
use once_cell::sync::OnceCell;
use std::future::Future;
use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
pub use connection_manager::spawn_connection_manager_task;
static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
///
/// Initialize the broker client. This must be called once at page server startup.
///
pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
let broker_endpoint = conf.broker_endpoint.clone();
// Note: we do not attempt connecting here (but validate endpoints sanity).
let broker_client =
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
format!(
"Failed to create broker client to {}",
&conf.broker_endpoint
),
)?;
if BROKER_CLIENT.set(broker_client).is_err() {
panic!("broker already initialized");
}
info!(
"Initialized broker client with endpoints: {}",
broker_endpoint
);
Ok(())
}
///
/// Get a handle to the broker client
///
pub fn get_broker_client() -> &'static BrokerClientChannel {
BROKER_CLIENT.get().expect("broker client not initialized")
}
pub fn is_broker_client_initialized() -> bool {
BROKER_CLIENT.get().is_some()
}
/// A handle of an asynchronous task.
/// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`]
/// and a cancellation token that it can listen to for earlier interrupts.
@@ -52,6 +95,7 @@ pub enum TaskEvent<E> {
#[derive(Debug, Clone)]
pub enum TaskStateUpdate<E> {
Init,
Started,
Progress(E),
}

View File

@@ -11,12 +11,11 @@
use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
use super::TaskStateUpdate;
use crate::broker_client::get_broker_client;
use crate::context::RequestContext;
use crate::task_mgr::TaskKind;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::Timeline;
use crate::{task_mgr, walreceiver::TaskStateUpdate};
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use pageserver_api::models::TimelineState;
@@ -29,7 +28,10 @@ use storage_broker::Streaming;
use tokio::{select, sync::watch};
use tracing::*;
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
use crate::{
exponential_backoff, walreceiver::get_broker_client, DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
};
use postgres_connection::{parse_host_port, PgConnectionConfig};
use utils::{
id::{NodeId, TenantTimelineId},
@@ -147,7 +149,7 @@ async fn connection_manager_loop_step(
let wal_connection = walreceiver_state.wal_connection.as_mut()
.expect("Should have a connection, as checked by the corresponding select! guard");
match wal_connection_update {
TaskEvent::Update(TaskStateUpdate::Started) => {},
TaskEvent::Update(TaskStateUpdate::Init | TaskStateUpdate::Started) => {},
TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
if new_status.has_processed_wal {
// We have advanced last_record_lsn by processing the WAL received

View File

@@ -22,9 +22,8 @@ use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use super::TaskStateUpdate;
use crate::context::RequestContext;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::{metrics::LIVE_CONNECTIONS_COUNT, walreceiver::TaskStateUpdate};
use crate::{
task_mgr,
task_mgr::TaskKind,

View File

@@ -22,18 +22,16 @@ use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use serde::Serialize;
use std::collections::VecDeque;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::ops::{Deref, DerefMut};
use std::os::fd::RawFd;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::{Mutex, MutexGuard};
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use std::{fs, io};
@@ -92,20 +90,6 @@ pub trait WalRedoManager: Send + Sync {
) -> Result<Bytes, WalRedoError>;
}
struct ProcessInput {
child: NoLeakChild,
stdin: ChildStdin,
stderr_fd: RawFd,
stdout_fd: RawFd,
n_requests: usize,
}
struct ProcessOutput {
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
}
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
@@ -117,9 +101,7 @@ pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
stdout: Mutex<Option<ProcessOutput>>,
stdin: Mutex<Option<ProcessInput>>,
stderr: Mutex<Option<ChildStderr>>,
process: Mutex<Option<PostgresRedoProcess>>,
}
/// Can this request be served by neon redo functions
@@ -227,17 +209,16 @@ impl PostgresRedoManager {
PostgresRedoManager {
tenant_id,
conf,
stdin: Mutex::new(None),
stdout: Mutex::new(None),
stderr: Mutex::new(None),
process: Mutex::new(None),
}
}
/// Launch process pre-emptively. Should not be needed except for benchmarking.
pub fn launch_process(&self, pg_version: u32) -> anyhow::Result<()> {
let mut proc = self.stdin.lock().unwrap();
if proc.is_none() {
self.launch(&mut proc, pg_version)?;
pub fn launch_process(&mut self, pg_version: u32) -> anyhow::Result<()> {
let inner = self.process.get_mut().unwrap();
if inner.is_none() {
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
*inner = Some(p);
}
Ok(())
}
@@ -260,19 +241,22 @@ impl PostgresRedoManager {
let start_time = Instant::now();
let mut proc = self.stdin.lock().unwrap();
let mut process_guard = self.process.lock().unwrap();
let lock_time = Instant::now();
// launch the WAL redo process on first use
if proc.is_none() {
self.launch(&mut proc, pg_version)?;
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result = self
.apply_wal_records(proc, buf_tag, base_img, records, wal_redo_timeout)
let result = process
.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout)
.map_err(WalRedoError::IoError);
let end_time = Instant::now();
@@ -311,22 +295,8 @@ impl PostgresRedoManager {
base_img_lsn,
lsn
);
// self.stdin only holds stdin & stderr as_raw_fd().
// Dropping it as part of take() doesn't close them.
// The owning objects (ChildStdout and ChildStderr) are stored in
// self.stdout and self.stderr, respsectively.
// We intentionally keep them open here to avoid a race between
// currently running `apply_wal_records()` and a `launch()` call
// after we return here.
// The currently running `apply_wal_records()` must not read from
// the newly launched process.
// By keeping self.stdout and self.stderr open here, `launch()` will
// get other file descriptors for the new child's stdout and stderr,
// and hence the current `apply_wal_records()` calls will observe
// `output.stdout.as_raw_fd() != stdout_fd` .
if let Some(proc) = self.stdin.lock().unwrap().take() {
proc.child.kill_and_wait();
}
let process = process_guard.take().unwrap();
process.kill();
}
result
}
@@ -625,23 +595,32 @@ impl<C: CommandExt> CloseFileDescriptors for C {
}
}
impl PostgresRedoManager {
///
/// Handle to the Postgres WAL redo process
///
struct PostgresRedoProcess {
tenant_id: TenantId,
child: NoLeakChild,
stdin: ChildStdin,
stdout: ChildStdout,
stderr: ChildStderr,
}
impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(tenant_id=%self.tenant_id, pg_version=pg_version))]
#[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
fn launch(
&self,
input: &mut MutexGuard<Option<ProcessInput>>,
conf: &PageServerConf,
tenant_id: TenantId,
pg_version: u32,
) -> Result<(), Error> {
) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
let datadir = path_with_suffix_extension(
self.conf
.tenant_path(&self.tenant_id)
.join("wal-redo-datadir"),
conf.tenant_path(&tenant_id).join("wal-redo-datadir"),
TEMP_FILE_SUFFIX,
);
@@ -655,12 +634,10 @@ impl PostgresRedoManager {
)
})?;
}
let pg_bin_dir_path = self
.conf
let pg_bin_dir_path = conf
.pg_bin_dir(pg_version)
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?;
let pg_lib_dir_path = self
.conf
let pg_lib_dir_path = conf
.pg_lib_dir(pg_version)
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?;
@@ -746,31 +723,27 @@ impl PostgresRedoManager {
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
**input = Some(ProcessInput {
Ok(PostgresRedoProcess {
tenant_id,
child,
stdout_fd: stdout.as_raw_fd(),
stderr_fd: stderr.as_raw_fd(),
stdin,
n_requests: 0,
});
*self.stdout.lock().unwrap() = Some(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
});
*self.stderr.lock().unwrap() = Some(stderr);
Ok(())
stderr,
})
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
fn kill(self) {
self.child.kill_and_wait();
}
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
fn apply_wal_records(
&self,
mut input: MutexGuard<Option<ProcessInput>>,
&mut self,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
@@ -807,23 +780,33 @@ impl PostgresRedoManager {
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let proc = input.as_mut().unwrap();
let mut nwrite = 0usize;
let stdout_fd = proc.stdout_fd;
// The input is now in 'writebuf'. Do a blind write first, writing as much as
// we can, before calling poll(). That skips one call to poll() if the stdin is
// already available for writing, which it almost certainly is because the
// process is idle.
let mut nwrite = self.stdin.write(&writebuf)?;
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
PollFd::new(stdout_fd, PollFlags::POLLIN),
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
];
// We do two things simultaneously: send the old base image and WAL records to
// the child process's stdin and forward any logging
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
while nwrite < writebuf.len() {
while nresult < BLCKSZ.into() {
// If we have more data to write, wake up if 'stdin' becomes writeable or
// we have data to read. Otherwise only wake up if there's data to read.
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
let n = loop {
match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
match nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
res => break res,
}
@@ -837,16 +820,14 @@ impl PostgresRedoManager {
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let len = stderr.read(&mut errbuf)?;
let n = self.stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if len > 0 {
if n > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..len])
String::from_utf8_lossy(&errbuf[0..n])
);
// To make sure we capture all log from the process if it fails, keep
@@ -860,157 +841,33 @@ impl PostgresRedoManager {
));
}
// If 'stdin' is writeable, do write.
let in_revents = pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
// If we have more data to write and 'stdin' is writeable, do write.
if nwrite < writebuf.len() {
let in_revents = pollfds[2].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += self.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
}
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += self.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
"WAL redo process closed its stdout unexpectedly",
));
}
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(input);
// To improve walredo performance we separate sending requests and receiving
// responses. Them are protected by different mutexes (output and input).
// If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
// then there is not warranty that T1 will first granted output mutex lock.
// To address this issue we maintain number of sent requests, number of processed
// responses and ring buffer with pending responses. After sending response
// (under input mutex), threads remembers request number. Then it releases
// input mutex, locks output mutex and fetch in ring buffer all responses until
// its stored request number. The it takes correspondent element from
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output_guard = self.stdout.lock().unwrap();
let output = output_guard.as_mut().unwrap();
if output.stdout.as_raw_fd() != stdout_fd {
// If stdout file descriptor is changed then it means that walredo process is crashed and restarted.
// As far as ProcessInput and ProcessOutout are protected by different mutexes,
// it can happen that we send request to one process and waiting response from another.
// To prevent such situation we compare stdout file descriptors.
// As far as old stdout pipe is destroyed only after new one is created,
// it can not reuse the same file descriptor, so this check is safe.
//
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
// That's where we kill the child process.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
while nresult < BLCKSZ.into() {
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
res => break res,
}
}?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if len > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..len])
);
// To make sure we capture all log from the process if it fails, keep
// reading from the stderr, before checking the stdout.
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[2].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));
}
// Replace our request's response with None in `pending_responses`.
// Then make space in the ring buffer by clearing out any seqence of contiguous
// `None`'s from the front of `pending_responses`.
// NB: We can't pop_front() because other requests' responses because another
// requester might have grabbed the output mutex before us:
// T1: grab input mutex
// T1: send request_no 23
// T1: release input mutex
// T2: grab input mutex
// T2: send request_no 24
// T2: release input mutex
// T2: grab output mutex
// T2: n_processed_responses + output.pending_responses.len() <= request_no
// 23 0 24
// T2: enters poll loop that reads stdout
// T2: put response for 23 into pending_responses
// T2: put response for 24 into pending_resposnes
// pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
// T2: takes its response_24
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: releases output mutex
// T1: grabs output mutex
// T1: n_processed_responses + output.pending_responses.len() > request_no
// 23 2 23
// T1: skips poll loop that reads stdout
// T1: takes its response_23
// pending_responses now looks like this: Front None None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Back
// n_processed_responses now has value 25
let res = output.pending_responses[request_no - n_processed_responses]
.take()
.expect("we own this request_no, nobody else is supposed to take it");
while let Some(front) = output.pending_responses.front() {
if front.is_none() {
output.pending_responses.pop_front();
output.n_processed_responses += 1;
} else {
break;
}
}
Ok(res)
Ok(Bytes::from(resultbuf))
}
}

View File

@@ -16,7 +16,7 @@ use crate::{
use once_cell::sync::Lazy;
use std::borrow::Cow;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
use tracing::{info, instrument, warn};
static CPLANE_WAITERS: Lazy<Waiters<mgmt::ComputeReady>> = Lazy::new(Default::default);
@@ -143,6 +143,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
&mut self,
extra: &ConsoleReqExtra<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
use_cleartext_password_flow: bool,
) -> auth::Result<Option<AuthSuccess<NodeInfo>>> {
use BackendType::*;
@@ -190,7 +191,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
(node, payload)
}
Console(endpoint, creds) if creds.use_cleartext_password_flow => {
Console(endpoint, creds) if use_cleartext_password_flow => {
// This is a hack to allow cleartext password in secure connections (wss).
let payload = fetch_plaintext_password(client).await?;
let creds = creds.as_ref();
@@ -220,16 +221,25 @@ impl BackendType<'_, ClientCredentials<'_>> {
}
/// Authenticate the client via the requested backend, possibly using credentials.
///
/// If `use_cleartext_password_flow` is true, we use the old cleartext password
/// flow. It is used for websocket connections, which want to minimize the number
/// of round trips.
#[instrument(skip_all)]
pub async fn authenticate(
mut self,
extra: &ConsoleReqExtra<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
use_cleartext_password_flow: bool,
) -> auth::Result<AuthSuccess<NodeInfo>> {
use BackendType::*;
// Handle cases when `project` is missing in `creds`.
// TODO: type safety: return `creds` with irrefutable `project`.
if let Some(res) = self.try_password_hack(extra, client).await? {
if let Some(res) = self
.try_password_hack(extra, client, use_cleartext_password_flow)
.await?
{
info!("user successfully authenticated (using the password hack)");
return Ok(res);
}

View File

@@ -34,9 +34,6 @@ pub struct ClientCredentials<'a> {
pub user: &'a str,
pub dbname: &'a str,
pub project: Option<Cow<'a, str>>,
/// If `True`, we'll use the old cleartext password flow. This is used for
/// websocket connections, which want to minimize the number of round trips.
pub use_cleartext_password_flow: bool,
}
impl ClientCredentials<'_> {
@@ -53,7 +50,6 @@ impl<'a> ClientCredentials<'a> {
user: self.user,
dbname: self.dbname,
project: self.project().map(Cow::Borrowed),
use_cleartext_password_flow: self.use_cleartext_password_flow,
}
}
}
@@ -63,7 +59,6 @@ impl<'a> ClientCredentials<'a> {
params: &'a StartupMessageParams,
sni: Option<&str>,
common_name: Option<&str>,
use_cleartext_password_flow: bool,
) -> Result<Self, ClientCredsParseError> {
use ClientCredsParseError::*;
@@ -113,7 +108,6 @@ impl<'a> ClientCredentials<'a> {
user = user,
dbname = dbname,
project = project.as_deref(),
use_cleartext_password_flow = use_cleartext_password_flow,
"credentials"
);
@@ -121,7 +115,6 @@ impl<'a> ClientCredentials<'a> {
user,
dbname,
project,
use_cleartext_password_flow,
})
}
}
@@ -148,7 +141,7 @@ mod tests {
let options = StartupMessageParams::new([("user", "john_doe")]);
// TODO: check that `creds.dbname` is None.
let creds = ClientCredentials::parse(&options, None, None, false)?;
let creds = ClientCredentials::parse(&options, None, None)?;
assert_eq!(creds.user, "john_doe");
Ok(())
@@ -158,7 +151,7 @@ mod tests {
fn parse_missing_project() -> anyhow::Result<()> {
let options = StartupMessageParams::new([("user", "john_doe"), ("database", "world")]);
let creds = ClientCredentials::parse(&options, None, None, false)?;
let creds = ClientCredentials::parse(&options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.dbname, "world");
assert_eq!(creds.project, None);
@@ -173,7 +166,7 @@ mod tests {
let sni = Some("foo.localhost");
let common_name = Some("localhost");
let creds = ClientCredentials::parse(&options, sni, common_name, false)?;
let creds = ClientCredentials::parse(&options, sni, common_name)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.dbname, "world");
assert_eq!(creds.project.as_deref(), Some("foo"));
@@ -189,7 +182,7 @@ mod tests {
("options", "-ckey=1 project=bar -c geqo=off"),
]);
let creds = ClientCredentials::parse(&options, None, None, false)?;
let creds = ClientCredentials::parse(&options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.dbname, "world");
assert_eq!(creds.project.as_deref(), Some("bar"));
@@ -208,7 +201,7 @@ mod tests {
let sni = Some("baz.localhost");
let common_name = Some("localhost");
let creds = ClientCredentials::parse(&options, sni, common_name, false)?;
let creds = ClientCredentials::parse(&options, sni, common_name)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.dbname, "world");
assert_eq!(creds.project.as_deref(), Some("baz"));
@@ -227,8 +220,7 @@ mod tests {
let sni = Some("second.localhost");
let common_name = Some("localhost");
let err =
ClientCredentials::parse(&options, sni, common_name, false).expect_err("should fail");
let err = ClientCredentials::parse(&options, sni, common_name).expect_err("should fail");
match err {
InconsistentProjectNames { domain, option } => {
assert_eq!(option, "first");
@@ -245,8 +237,7 @@ mod tests {
let sni = Some("project.localhost");
let common_name = Some("example.com");
let err =
ClientCredentials::parse(&options, sni, common_name, false).expect_err("should fail");
let err = ClientCredentials::parse(&options, sni, common_name).expect_err("should fail");
match err {
InconsistentSni { sni, cn } => {
assert_eq!(sni, "project.localhost");

View File

@@ -25,12 +25,11 @@ impl CancelMap {
cancel_closure.try_cancel_query().await
}
/// Run async action within an ephemeral session identified by [`CancelKeyData`].
pub async fn with_session<'a, F, R, V>(&'a self, f: F) -> anyhow::Result<V>
where
F: FnOnce(Session<'a>) -> R,
R: std::future::Future<Output = anyhow::Result<V>>,
{
/// Create a new session, with a new client-facing random cancellation key.
///
/// Use `enable_query_cancellation` to register a database cancellation
/// key with it, and to get the client-facing key.
pub fn new_session<'a>(&'a self) -> anyhow::Result<Session<'a>> {
// HACK: We'd rather get the real backend_pid but tokio_postgres doesn't
// expose it and we don't want to do another roundtrip to query
// for it. The client will be able to notice that this is not the
@@ -44,17 +43,9 @@ impl CancelMap {
.lock()
.try_insert(key, None)
.map_err(|_| anyhow!("query cancellation key already exists: {key}"))?;
// This will guarantee that the session gets dropped
// as soon as the future is finished.
scopeguard::defer! {
self.0.lock().remove(&key);
info!("dropped query cancellation key {key}");
}
info!("registered new query cancellation key {key}");
let session = Session::new(key, self);
f(session).await
Ok(Session::new(key, self))
}
#[cfg(test)]
@@ -111,7 +102,7 @@ impl<'a> Session<'a> {
impl Session<'_> {
/// Store the cancel token for the given session.
/// This enables query cancellation in [`crate::proxy::handshake`].
pub fn enable_query_cancellation(self, cancel_closure: CancelClosure) -> CancelKeyData {
pub fn enable_query_cancellation(&self, cancel_closure: CancelClosure) -> CancelKeyData {
info!("enabling query cancellation for this session");
self.cancel_map
.0
@@ -122,6 +113,14 @@ impl Session<'_> {
}
}
impl<'a> Drop for Session<'a> {
fn drop(&mut self) {
let key = &self.key;
self.cancel_map.0.lock().remove(key);
info!("dropped query cancellation key {key}");
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -132,14 +131,14 @@ mod tests {
static CANCEL_MAP: Lazy<CancelMap> = Lazy::new(Default::default);
let (tx, rx) = tokio::sync::oneshot::channel();
let task = tokio::spawn(CANCEL_MAP.with_session(|session| async move {
let session = CANCEL_MAP.new_session()?;
let task = tokio::spawn(async move {
assert!(CANCEL_MAP.contains(&session));
tx.send(()).expect("failed to send");
futures::future::pending::<()>().await; // sleep forever
Ok(())
}));
});
// Wait until the task has been spawned.
rx.await.context("failed to hear from the task")?;

View File

@@ -14,7 +14,7 @@ use once_cell::sync::Lazy;
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, info_span, Instrument};
use tracing::{error, info, info_span, instrument, Instrument};
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
@@ -71,17 +71,35 @@ pub async fn task_main(
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(config, &cancel_map, session_id, socket).await
handle_postgres_client(config, &cancel_map, session_id, socket).await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
})
.instrument(info_span!("client", session = format_args!("{session_id}"))),
}),
);
}
}
/// Handle an incoming PostgreSQL connection
pub async fn handle_postgres_client(
config: &ProxyConfig,
cancel_map: &CancelMap,
session_id: uuid::Uuid,
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
) -> anyhow::Result<()> {
handle_client(
config,
cancel_map,
session_id,
stream,
HostnameMethod::Sni,
false,
)
.await
}
/// Handle an incoming Postgres connection that's wrapped in websocket
pub async fn handle_ws_client(
config: &ProxyConfig,
cancel_map: &CancelMap,
@@ -89,45 +107,32 @@ pub async fn handle_ws_client(
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
hostname: Option<String>,
) -> anyhow::Result<()> {
// The `closed` counter will increase when this future is destroyed.
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
scopeguard::defer! {
NUM_CONNECTIONS_CLOSED_COUNTER.inc();
}
let tls = config.tls_config.as_ref();
let hostname = hostname.as_deref();
// TLS is None here, because the connection is already encrypted.
let do_handshake = handshake(stream, None, cancel_map).instrument(info_span!("handshake"));
let (mut stream, params) = match do_handshake.await? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
};
// Extract credentials which we're going to use for auth.
let creds = {
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
let result = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&params, hostname, common_name, true))
.transpose();
async { result }.or_else(|e| stream.throw_error(e)).await?
};
let client = Client::new(stream, creds, &params, session_id);
cancel_map
.with_session(|session| client.connect_to_db(session))
.await
handle_client(
config,
cancel_map,
session_id,
stream,
HostnameMethod::Param(hostname),
true,
)
.await
}
enum HostnameMethod {
Param(Option<String>),
Sni,
}
/// Handle an incoming client connection, handshake and authentication.
/// After that, keeps forwarding all the data. This doesn't return until the
/// connection is lost.
async fn handle_client(
config: &ProxyConfig,
cancel_map: &CancelMap,
session_id: uuid::Uuid,
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
raw_stream: impl AsyncRead + AsyncWrite + Unpin + Send,
hostname_method: HostnameMethod,
use_cleartext_password_flow: bool,
) -> anyhow::Result<()> {
// The `closed` counter will increase when this future is destroyed.
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
@@ -135,36 +140,73 @@ async fn handle_client(
NUM_CONNECTIONS_CLOSED_COUNTER.inc();
}
let tls = config.tls_config.as_ref();
let do_handshake = handshake(stream, tls, cancel_map).instrument(info_span!("handshake"));
let (mut stream, params) = match do_handshake.await? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
};
// Accept the connection from the client, authenticate it, and establish
// connection to the database.
//
// We cover all these activities in a one tracing span, so that they are
// traced as one request. That makes it convenient to investigate where
// the time is spent when establishing a new connection. After the
// connection has been established, we exit the span, and use a separate
// span for the (rest of the) duration of the connection.
let conn = async {
// Process postgres startup packet and upgrade to TLS (if applicable)
let tls = config.tls_config.as_ref();
let (mut stream, params) = match handshake(raw_stream, tls, cancel_map).await? {
Some(x) => x,
None => return Ok::<_, anyhow::Error>(None), // it's a cancellation request
};
// Extract credentials which we're going to use for auth.
let creds = {
let sni = stream.get_ref().sni_hostname();
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
let result = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&params, sni, common_name, false))
.transpose();
// Extract credentials which we're going to use for auth.
let creds = {
let sni = match &hostname_method {
HostnameMethod::Param(hostname) => hostname.as_deref(),
HostnameMethod::Sni => stream.get_ref().sni_hostname(),
};
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
let result = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&params, sni, common_name))
.transpose();
async { result }.or_else(|e| stream.throw_error(e)).await?
};
async { result }.or_else(|e| stream.throw_error(e)).await?
};
let client = Client::new(stream, creds, &params, session_id);
cancel_map
.with_session(|session| client.connect_to_db(session))
.await
Ok(Some(
EstablishedConnection::connect_to_db(
stream,
creds,
&params,
session_id,
use_cleartext_password_flow,
cancel_map,
)
.await?,
))
}
.instrument(info_span!("establish_connection", session_id=%session_id))
.await?;
match conn {
Some(conn) => {
// Connection established in both ways. Forward all traffic until the
// either connection is lost.
conn.handle_connection()
.instrument(info_span!("forward", session_id=%session_id))
.await
}
None => {
// It was a cancellation request. It was handled in 'handshake' already.
Ok(())
}
}
}
/// Establish a (most probably, secure) connection with the client.
/// For better testing experience, `stream` can be any object satisfying the traits.
/// It's easier to work with owned `stream` here as we need to upgrade it to TLS;
/// we also take an extra care of propagating only the select handshake errors to client.
#[instrument(skip_all)]
async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
stream: S,
mut tls: Option<&TlsConfig>,
@@ -227,43 +269,36 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
/// Thin connection context.
struct Client<'a, S> {
/// The underlying libpq protocol stream.
stream: PqStream<S>,
/// Client credentials that we care about.
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
/// KV-dictionary with PostgreSQL connection params.
params: &'a StartupMessageParams,
/// Unique connection ID.
session_id: uuid::Uuid,
struct EstablishedConnection<'a, S> {
client_stream: MeasuredStream<S>,
db_stream: MeasuredStream<tokio::net::TcpStream>,
/// Hold on to the Session for as long as the connection is alive, so that
/// it can be cancelled.
_session: cancellation::Session<'a>,
}
impl<'a, S> Client<'a, S> {
/// Construct a new connection context.
fn new(
stream: PqStream<S>,
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
params: &'a StartupMessageParams,
session_id: uuid::Uuid,
) -> Self {
Self {
stream,
creds,
params,
session_id,
}
impl<S: AsyncRead + AsyncWrite + Unpin + Send> EstablishedConnection<'_, S> {
async fn handle_connection(mut self) -> anyhow::Result<()> {
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
let _ = tokio::io::copy_bidirectional(&mut self.client_stream, &mut self.db_stream).await?;
Ok(())
}
}
impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
/// Let the client authenticate and connect to the designated compute node.
async fn connect_to_db(self, session: cancellation::Session<'_>) -> anyhow::Result<()> {
let Self {
mut stream,
creds,
params,
session_id,
} = self;
/// On return, the connection is fully established in both ways, and we can start
/// forwarding the bytes.
#[instrument(skip_all)]
async fn connect_to_db<'a>(
mut stream: PqStream<S>,
creds: auth::BackendType<'a, auth::ClientCredentials<'_>>,
params: &'_ StartupMessageParams,
session_id: uuid::Uuid,
use_cleartext_password_flow: bool,
cancel_map: &'a CancelMap,
) -> anyhow::Result<EstablishedConnection<'a, S>> {
let session = cancel_map.new_session()?;
let extra = auth::ConsoleReqExtra {
session_id, // aka this connection's id
@@ -272,10 +307,11 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
let auth_result = async {
// `&mut stream` doesn't let us merge those 2 lines.
let res = creds.authenticate(&extra, &mut stream).await;
let res = creds
.authenticate(&extra, &mut stream, use_cleartext_password_flow)
.await;
async { res }.or_else(|e| stream.throw_error(e)).await
}
.instrument(info_span!("auth"))
.await?;
let node = auth_result.value;
@@ -311,21 +347,15 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
.await?;
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("tx"));
let mut client = MeasuredStream::new(stream.into_inner(), |cnt| {
// Number of bytes we sent to the client (outbound).
m_sent.inc_by(cnt as u64);
});
let client_stream = MeasuredStream::new(stream.into_inner(), m_sent);
let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("rx"));
let mut db = MeasuredStream::new(db.stream, |cnt| {
// Number of bytes the client sent to the compute node (inbound).
m_recv.inc_by(cnt as u64);
});
let db_stream = MeasuredStream::new(db.stream, m_recv);
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;
Ok(())
Ok(EstablishedConnection {
client_stream,
db_stream,
_session: session,
})
}
}

View File

@@ -10,6 +10,7 @@ use std::{io, task};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio_rustls::server::TlsStream;
use tracing::instrument;
pin_project! {
/// Stream wrapper which implements libpq's protocol.
@@ -105,6 +106,7 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
/// Write the error message using [`Self::write_message`], then re-throw it.
/// Allowing string literals is safe under the assumption they might not contain any runtime info.
/// This method exists due to `&str` not implementing `Into<anyhow::Error>`.
#[instrument(skip_all)]
pub async fn throw_error_str<T>(&mut self, error: &'static str) -> anyhow::Result<T> {
tracing::info!("forwarding error to user: {error}");
self.write_message(&BeMessage::ErrorResponse(error, None))
@@ -114,6 +116,7 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
/// Write the error message using [`Self::write_message`], then re-throw it.
/// Trait [`UserFacingError`] acts as an allowlist for error types.
#[instrument(skip_all)]
pub async fn throw_error<T, E>(&mut self, error: E) -> anyhow::Result<T>
where
E: UserFacingError + Into<anyhow::Error>,
@@ -228,27 +231,27 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for Stream<S> {
}
pin_project! {
/// This stream tracks all writes and calls user provided
/// callback when the underlying stream is flushed.
pub struct MeasuredStream<S, W> {
/// This stream tracks all writes, and whenever the stream is flushed,
/// increments the user-provided counter by the number of bytes flushed.
pub struct MeasuredStream<S> {
#[pin]
stream: S,
write_count: usize,
inc_write_count: W,
write_counter: prometheus::IntCounter,
}
}
impl<S, W> MeasuredStream<S, W> {
pub fn new(stream: S, inc_write_count: W) -> Self {
impl<S> MeasuredStream<S> {
pub fn new(stream: S, write_counter: prometheus::IntCounter) -> Self {
Self {
stream,
write_count: 0,
inc_write_count,
write_counter,
}
}
}
impl<S: AsyncRead + Unpin, W> AsyncRead for MeasuredStream<S, W> {
impl<S: AsyncRead + Unpin> AsyncRead for MeasuredStream<S> {
fn poll_read(
self: Pin<&mut Self>,
context: &mut task::Context<'_>,
@@ -258,7 +261,7 @@ impl<S: AsyncRead + Unpin, W> AsyncRead for MeasuredStream<S, W> {
}
}
impl<S: AsyncWrite + Unpin, W: FnMut(usize)> AsyncWrite for MeasuredStream<S, W> {
impl<S: AsyncWrite + Unpin> AsyncWrite for MeasuredStream<S> {
fn poll_write(
self: Pin<&mut Self>,
context: &mut task::Context<'_>,
@@ -279,7 +282,7 @@ impl<S: AsyncWrite + Unpin, W: FnMut(usize)> AsyncWrite for MeasuredStream<S, W>
let this = self.project();
this.stream.poll_flush(context).map_ok(|()| {
// Call the user provided callback and reset the write count.
(this.inc_write_count)(*this.write_count);
this.write_counter.inc_by(*this.write_count as u64);
*this.write_count = 0;
})
}

View File

@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*could not load tenant.*load timeline.*",
".*could not load tenant.*load local timeline.*",
]
)

View File

@@ -2,15 +2,7 @@ from contextlib import closing
import psycopg2.extras
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
LocalFsStorage,
NeonEnvBuilder,
RemoteStorageKind,
assert_tenant_status,
wait_for_upload,
)
from fixtures.types import Lsn
from fixtures.utils import wait_until
from fixtures.neon_fixtures import NeonEnvBuilder
def test_tenant_config(neon_env_builder: NeonEnvBuilder):
@@ -166,46 +158,3 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
"pitr_interval": 60,
}.items()
)
def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_creating_tenant_conf_after_attach",
)
env = neon_env_builder.init_start()
assert isinstance(env.remote_storage, LocalFsStorage)
# tenant is created with defaults, as in without config file
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
config_path = env.repo_dir / "tenants" / str(tenant_id) / "config"
assert config_path.exists(), "config file is always initially created"
http_client = env.pageserver.http_client()
detail = http_client.timeline_detail(tenant_id, timeline_id)
last_record_lsn = Lsn(detail["last_record_lsn"])
assert last_record_lsn.lsn_int != 0, "initdb must have executed"
wait_for_upload(http_client, tenant_id, timeline_id, last_record_lsn)
http_client.tenant_detach(tenant_id)
assert not config_path.exists(), "detach did not remove config file"
http_client.tenant_attach(tenant_id)
wait_until(
number_of_iterations=5,
interval=1,
func=lambda: assert_tenant_status(http_client, tenant_id, "Active"),
)
env.neon_cli.config_tenant(tenant_id, {"gc_horizon": "1000000"})
contents_first = config_path.read_text()
env.neon_cli.config_tenant(tenant_id, {"gc_horizon": "0"})
contents_later = config_path.read_text()
# dont test applying the setting here, we have that another test case to show it
# we just care about being able to create the file
assert len(contents_first) > len(contents_later)

View File

@@ -579,11 +579,12 @@ def test_load_attach_negatives(
pageserver_http.tenant_ignore(tenant_id)
expect_error_msg = f".*attach tenant {tenant_id}: some local filesystem state already exists:"
env.pageserver.allowed_errors.append(expect_error_msg)
env.pageserver.allowed_errors.append(
".*Cannot attach tenant .*?, local tenant directory already exists.*"
)
with pytest.raises(
expected_exception=PageserverApiException,
match=expect_error_msg,
match=f"Cannot attach tenant {tenant_id}, local tenant directory already exists",
):
pageserver_http.tenant_attach(tenant_id)
@@ -627,11 +628,12 @@ def test_ignore_while_attaching(
pageserver_http.tenant_ignore(tenant_id)
# Cannot attach it due to some local files existing
expect_error_msg = f".*attach tenant {tenant_id}: some local filesystem state already exists:"
env.pageserver.allowed_errors.append(expect_error_msg)
env.pageserver.allowed_errors.append(
".*Cannot attach tenant .*?, local tenant directory already exists.*"
)
with pytest.raises(
expected_exception=PageserverApiException,
match=expect_error_msg,
match=f"Cannot attach tenant {tenant_id}, local tenant directory already exists",
):
pageserver_http.tenant_attach(tenant_id)