Compare commits

..

1 Commits

Author SHA1 Message Date
Dmitry Rodionov
4c1cb890db try to toggle background activity without a race condition 2022-10-20 22:30:42 +03:00
20 changed files with 215 additions and 417 deletions

View File

@@ -481,7 +481,6 @@ jobs:
neon-image:
runs-on: dev
needs: [ tag ]
container: gcr.io/kaniko-project/executor:v1.9.0-debug
steps:
@@ -495,11 +494,10 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build neon
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID
compute-tools-image:
runs-on: dev
needs: [ tag ]
container: gcr.io/kaniko-project/executor:v1.9.0-debug
steps:
@@ -510,12 +508,11 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute tools
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID
compute-node-image:
runs-on: dev
container: gcr.io/kaniko-project/executor:v1.9.0-debug
needs: [ tag ]
steps:
- name: Checkout
uses: actions/checkout@v1 # v3 won't work with kaniko
@@ -530,12 +527,11 @@ jobs:
# cloud repo depends on this image name, thus duplicating it
# remove compute-node when cloud repo is updated
- name: Kaniko build compute node with extensions v14 (compatibility)
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID
compute-node-image-v14:
runs-on: dev
container: gcr.io/kaniko-project/executor:v1.9.0-debug
needs: [ tag ]
steps:
- name: Checkout
uses: actions/checkout@v1 # v3 won't work with kaniko
@@ -547,13 +543,12 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute node with extensions v14
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID
compute-node-image-v15:
runs-on: dev
container: gcr.io/kaniko-project/executor:v1.9.0-debug
needs: [ tag ]
steps:
- name: Checkout
uses: actions/checkout@v1 # v3 won't work with kaniko
@@ -565,11 +560,11 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute node with extensions v15
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID
promote-images:
runs-on: dev
needs: [ tag, neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
if: github.event_name != 'workflow_dispatch'
container: amazon/aws-cli
strategy:
@@ -582,9 +577,8 @@ jobs:
steps:
- name: Promote image to latest
run: |
export MANIFEST=$(aws ecr batch-get-image --repository-name ${{ matrix.name }} --image-ids imageTag=${{needs.tag.outputs.build-tag}} --query 'images[].imageManifest' --output text)
aws ecr put-image --repository-name ${{ matrix.name }} --image-tag latest --image-manifest "$MANIFEST"
run:
MANIFEST=$(aws ecr batch-get-image --repository-name ${{ matrix.name }} --image-ids imageTag=$GITHUB_RUN_ID --query 'images[].imageManifest' --output text) && aws ecr put-image --repository-name ${{ matrix.name }} --image-tag latest --image-manifest "$MANIFEST"
push-docker-hub:
runs-on: dev
@@ -603,19 +597,19 @@ jobs:
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Pull neon image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} neon
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:latest neon
- name: Pull compute tools image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} compute-tools
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest compute-tools
- name: Pull compute node image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:${{needs.tag.outputs.build-tag}} compute-node
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:latest compute-node
- name: Pull compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} compute-node-v14
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest compute-node-v14
- name: Pull compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} compute-node-v15
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest compute-node-v15
- name: Pull rust image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
@@ -625,11 +619,11 @@ jobs:
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/neon:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-tools:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v14:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v15:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/neon:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-tools:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v14:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v15:latest
- name: Configure Docker Hub login
run: |

View File

@@ -183,18 +183,18 @@ impl PostgresNode {
}
fn sync_safekeepers(&self, auth_token: &Option<String>, pg_version: u32) -> Result<Lsn> {
let pg_path = self.env.pg_bin_dir(pg_version)?.join("postgres");
let pg_path = self.env.pg_bin_dir(pg_version).join("postgres");
let mut cmd = Command::new(&pg_path);
cmd.arg("--sync-safekeepers")
.env_clear()
.env(
"LD_LIBRARY_PATH",
self.env.pg_lib_dir(pg_version)?.to_str().unwrap(),
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
)
.env(
"DYLD_LIBRARY_PATH",
self.env.pg_lib_dir(pg_version)?.to_str().unwrap(),
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
)
.env("PGDATA", self.pgdata().to_str().unwrap())
.stdout(Stdio::piped())
@@ -422,7 +422,7 @@ impl PostgresNode {
}
fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
let pg_ctl_path = self.env.pg_bin_dir(self.pg_version).join("pg_ctl");
let mut cmd = Command::new(pg_ctl_path);
cmd.args(
[
@@ -440,11 +440,11 @@ impl PostgresNode {
.env_clear()
.env(
"LD_LIBRARY_PATH",
self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
)
.env(
"DYLD_LIBRARY_PATH",
self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
);
if let Some(token) = auth_token {
cmd.env("ZENITH_AUTH_TOKEN", token);

View File

@@ -201,28 +201,28 @@ impl LocalEnv {
self.pg_distrib_dir.clone()
}
pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
let path = self.pg_distrib_dir.clone();
match pg_version {
14 => Ok(path.join(format!("v{pg_version}"))),
15 => Ok(path.join(format!("v{pg_version}"))),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => path.join(format!("v{pg_version}")),
15 => path.join(format!("v{pg_version}")),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => self.pg_distrib_dir(pg_version).join("bin"),
15 => self.pg_distrib_dir(pg_version).join("bin"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => self.pg_distrib_dir(pg_version).join("lib"),
15 => self.pg_distrib_dir(pg_version).join("lib"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
@@ -422,10 +422,10 @@ impl LocalEnv {
"directory '{}' already exists. Perhaps already initialized?",
base_path.display()
);
if !self.pg_bin_dir(pg_version)?.join("postgres").exists() {
if !self.pg_bin_dir(pg_version).join("postgres").exists() {
bail!(
"Can't find postgres binary at {}",
self.pg_bin_dir(pg_version)?.display()
self.pg_bin_dir(pg_version).display()
);
}
for binary in ["pageserver", "safekeeper"] {

View File

@@ -201,3 +201,13 @@ pub struct FailpointConfig {
pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantSetBackgroundActivityRequest {
pub run_backround_jobs: bool,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantSetBackgroundActivityResponse {
pub msg: String,
}

View File

@@ -37,22 +37,22 @@ pub static REQUIRED_POSTGRES_CONFIG: Lazy<Vec<&'static str>> = Lazy::new(|| {
});
impl Conf {
pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
pub fn pg_distrib_dir(&self) -> PathBuf {
let path = self.pg_distrib_dir.clone();
match self.pg_version {
14 => Ok(path.join(format!("v{}", self.pg_version))),
15 => Ok(path.join(format!("v{}", self.pg_version))),
_ => bail!("Unsupported postgres version: {}", self.pg_version),
14 => path.join(format!("v{}", self.pg_version)),
15 => path.join(format!("v{}", self.pg_version)),
_ => panic!("Unsupported postgres version: {}", self.pg_version),
}
}
fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
Ok(self.pg_distrib_dir()?.join("bin"))
fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir().join("bin")
}
fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
Ok(self.pg_distrib_dir()?.join("lib"))
fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir().join("lib")
}
pub fn wal_dir(&self) -> PathBuf {
@@ -60,12 +60,12 @@ impl Conf {
}
fn new_pg_command(&self, command: impl AsRef<Path>) -> Result<Command> {
let path = self.pg_bin_dir()?.join(command);
let path = self.pg_bin_dir().join(command);
ensure!(path.exists(), "Command {:?} does not exist", path);
let mut cmd = Command::new(path);
cmd.env_clear()
.env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
.env("LD_LIBRARY_PATH", self.pg_lib_dir())
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir());
Ok(cmd)
}

View File

@@ -387,28 +387,28 @@ impl PageServerConf {
//
// Postgres distribution paths
//
pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
let path = self.pg_distrib_dir.clone();
match pg_version {
14 => Ok(path.join(format!("v{pg_version}"))),
15 => Ok(path.join(format!("v{pg_version}"))),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => path.join(format!("v{pg_version}")),
15 => path.join(format!("v{pg_version}")),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => self.pg_distrib_dir(pg_version).join("bin"),
15 => self.pg_distrib_dir(pg_version).join("bin"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
_ => bail!("Unsupported postgres version: {}", pg_version),
14 => self.pg_distrib_dir(pg_version).join("lib"),
15 => self.pg_distrib_dir(pg_version).join("lib"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}

View File

@@ -3,6 +3,9 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use pageserver_api::models::{
TenantSetBackgroundActivityRequest, TenantSetBackgroundActivityResponse,
};
use remote_storage::GenericRemoteStorage;
use tokio::task::JoinError;
use tracing::*;
@@ -13,11 +16,12 @@ use super::models::{
TimelineCreateRequest,
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::task_mgr::TaskKind;
use crate::tenant::{TenantState, Timeline};
use crate::tenant_config::TenantConfOpt;
use crate::{config::PageServerConf, tenant_mgr};
use crate::{storage_sync, task_mgr};
use utils::{
auth::JwtAuth,
http::{
@@ -570,6 +574,63 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
)
}
async fn tenant_set_background_activity(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let request: TenantSetBackgroundActivityRequest = json_request(&mut request).await?;
let tenant = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::NotFound)?;
let modified = tenant.set_state_with(|old_state| {
let background_jobs_running = match old_state {
TenantState::Active {
background_jobs_running,
} => background_jobs_running,
_ => return None,
};
match (request.run_backround_jobs, background_jobs_running) {
(true, true) => None,
(false, false) => None,
(true, false) => Some(TenantState::Active {
background_jobs_running: true,
}),
(false, true) => {
// tasks will eventually shut down after that, but we need a guarantee
// that they've stopped so explicitly waiting for it
Some(TenantState::Active {
background_jobs_running: false,
})
}
}
});
if !modified {
return Ok(json_response(
StatusCode::NOT_MODIFIED,
TenantSetBackgroundActivityResponse { msg: "".to_owned() },
)?);
}
// state was modified and request values was set to false which means we changed state
// and now need to wait for tasks shutdown
// XXX can it be changed second time here? and modified flag be outdated now?
if modified && !request.run_backround_jobs {
task_mgr::shutdown_tasks(Some(TaskKind::Compaction), Some(tenant_id), None).await;
task_mgr::shutdown_tasks(Some(TaskKind::GarbageCollector), Some(tenant_id), None).await;
}
Ok(json_response(
StatusCode::OK,
TenantSetBackgroundActivityResponse {
msg: format!("run background jobs set to {}", request.run_backround_jobs),
},
)?)
}
// Helper function to standardize the error messages we produce on bad durations
//
// Intended to be used with anyhow's `with_context`, e.g.:
@@ -904,6 +965,10 @@ pub fn make_router(
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
.post("/v1/tenant/:tenant_id/attach", tenant_attach_handler)
.post("/v1/tenant/:tenant_id/detach", tenant_detach_handler)
.post(
"/v1/tenant/:tenant_id/set_background_activity",
tenant_set_background_activity,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,

View File

@@ -46,8 +46,6 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
pub const LOG_FILE_NAME: &str = "pageserver.log";
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
/// Config for the Repository checkpointer
#[derive(Debug, Clone, Copy)]
pub enum CheckpointConfig {

View File

@@ -1373,17 +1373,6 @@ fn is_rel_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0
}
pub fn is_rel_fsm_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
}
pub fn is_rel_vm_block_key(key: Key) -> bool {
key.field1 == 0x00
&& key.field4 != 0
&& key.field5 == VISIBILITYMAP_FORKNUM
&& key.field6 != 0xffffffff
}
pub fn key_to_slru_block(key: Key) -> Result<(SlruKind, u32, BlockNumber)> {
Ok(match key.field1 {
0x01 => {

View File

@@ -191,7 +191,9 @@ impl UninitializedTimeline<'_> {
)
})?;
v.insert(Arc::clone(&new_timeline));
new_timeline.launch_wal_receiver();
new_timeline.launch_wal_receiver().with_context(|| {
format!("Failed to launch walreceiver for timeline {tenant_id}/{timeline_id}")
})?;
}
}
@@ -653,22 +655,42 @@ impl Tenant {
}
pub fn set_state(&self, new_state: TenantState) {
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
}
(TenantState::Broken, _) => {
error!("Ignoring state update {new_state:?} for broken tenant");
}
(_, new_state) => {
self.state.send_replace(new_state);
if self.should_run_tasks() {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(self.tenant_id);
self.set_state_with(|_| Some(new_state));
}
pub fn set_state_with<F>(&self, f: F) -> bool
where
F: FnOnce(&mut TenantState) -> Option<TenantState>,
{
let modify = |old_state: &mut TenantState| {
let new_state = match f(old_state) {
None => return false,
Some(new_state) => new_state,
};
match (old_state, new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == &equal_state_2 => {
debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
false
}
(TenantState::Broken, _) => {
error!("Ignoring state update {new_state:?} for broken tenant");
false
}
(old_state, new_state) => {
*old_state = new_state;
true
}
}
};
let modified = self.state.send_if_modified(modify);
if modified && self.should_run_tasks() {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(self.tenant_id);
}
modified
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
@@ -1412,8 +1434,8 @@ fn run_initdb(
initdb_target_dir: &Path,
pg_version: u32,
) -> Result<()> {
let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
let initdb_bin_path = conf.pg_bin_dir(pg_version).join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version);
info!(
"running {} in {}, libdir: {}",
initdb_bin_path.display(),

View File

@@ -34,7 +34,6 @@ use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::TimelineMetrics;
use crate::pgdatadir_mapping::BlockNumber;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::reltag::RelTag;
use crate::tenant_config::TenantConfOpt;
@@ -53,7 +52,6 @@ use crate::task_mgr::TaskKind;
use crate::walreceiver::{is_etcd_client_initialized, spawn_connection_manager_task};
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
use crate::ZERO_PAGE;
use crate::{
page_cache,
storage_sync::{self, index::LayerFileMetadata},
@@ -602,11 +600,11 @@ impl Timeline {
result
}
pub fn launch_wal_receiver(self: &Arc<Self>) {
pub fn launch_wal_receiver(self: &Arc<Self>) -> anyhow::Result<()> {
if !is_etcd_client_initialized() {
if cfg!(test) {
info!("not launching WAL receiver because etcd client hasn't been initialized");
return;
return Ok(());
} else {
panic!("etcd client not initialized");
}
@@ -634,7 +632,9 @@ impl Timeline {
walreceiver_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
);
)?;
Ok(())
}
///
@@ -1496,32 +1496,7 @@ impl Timeline {
for range in &partition.ranges {
let mut key = range.start;
while key < range.end {
let img = match self.get(key, lsn) {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(err);
}
}
};
let img = self.get(key, lsn)?;
image_layer_writer.put_image(key, &img)?;
key = key.next();
}
@@ -1984,10 +1959,10 @@ impl Timeline {
new_gc_cutoff
);
write_guard.store_and_unlock(new_gc_cutoff).wait();
// Persist metadata file
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
}
// Persist the new GC cutoff value in the metadata file, before
// we actually remove anything.
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
info!("GC starting");
@@ -2114,12 +2089,15 @@ impl Timeline {
}
info!(
"GC completed removing {} layers, cutoff {}",
"GC completed removing {} layers, cuttof {}",
result.layers_removed, new_gc_cutoff
);
if result.layers_removed != 0 {
fail_point!("after-timeline-gc-removed-layers");
fail_point!("gc-before-save-metadata", |_| {
info!("Abnormaly terinate pageserver at gc-before-save-metadata fail point");
std::process::abort();
});
return Ok(result);
}
if self.upload_layers.load(atomic::Ordering::Relaxed) {

View File

@@ -34,7 +34,6 @@ use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use crate::walrecord::*;
use crate::ZERO_PAGE;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
@@ -44,6 +43,8 @@ use postgres_ffi::TransactionId;
use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
pub struct WalIngest<'a> {
timeline: &'a Timeline,

View File

@@ -47,7 +47,7 @@ pub fn spawn_connection_manager_task(
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
) {
) -> anyhow::Result<()> {
let mut etcd_client = get_etcd_client().clone();
let tenant_id = timeline.tenant_id;
@@ -95,6 +95,7 @@ pub fn spawn_connection_manager_task(
info_span!("wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id),
),
);
Ok(())
}
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.

View File

@@ -610,26 +610,13 @@ impl PostgresRedoProcess {
);
fs::remove_dir_all(&datadir)?;
}
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).map_err(|e| {
Error::new(
ErrorKind::Other,
format!("incorrect pg_bin_dir path: {}", e),
)
})?;
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).map_err(|e| {
Error::new(
ErrorKind::Other,
format!("incorrect pg_lib_dir path: {}", e),
)
})?;
info!("running initdb in {}", datadir.display());
let initdb = Command::new(pg_bin_dir_path.join("initdb"))
let initdb = Command::new(conf.pg_bin_dir(pg_version).join("initdb"))
.args(&["-D", &datadir.to_string_lossy()])
.arg("-N")
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) // macOS
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.close_fds()
.output()
.map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {e}")))?;
@@ -655,14 +642,14 @@ impl PostgresRedoProcess {
}
// Start postgres itself
let mut child = Command::new(pg_bin_dir_path.join("postgres"))
let mut child = Command::new(conf.pg_bin_dir(pg_version).join("postgres"))
.arg("--wal-redo")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("PGDATA", &datadir)
// The redo process is not trusted, so it runs in seccomp mode
// (see seccomp in zenith_wal_redo.c). We have to make sure it doesn't

View File

@@ -10,9 +10,6 @@ EXTENSION = neon_test_utils
DATA = neon_test_utils--1.0.sql
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

View File

@@ -23,11 +23,6 @@ RETURNS bytea
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION neon_seqscan_rel(rel regclass, nprefetch int DEFAULT 0)
RETURNS void
AS 'MODULE_PATHNAME', 'neon_seqscan_rel'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
RETURNS VOID
AS 'MODULE_PATHNAME', 'neon_xlogflush'

View File

@@ -23,13 +23,8 @@
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/varlena.h"
#include "utils/wait_event.h"
#include "../neon/pagestore_client.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
#include "libpq/libpq.h"
PG_MODULE_MAGIC;
extern void _PG_init(void);
@@ -39,7 +34,6 @@ PG_FUNCTION_INFO_V1(clear_buffer_cache);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
PG_FUNCTION_INFO_V1(neon_xlogflush);
PG_FUNCTION_INFO_V1(neon_seqscan_rel);
/*
* Linkage to functions in neon module.
@@ -295,238 +289,6 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
}
}
/*
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(PGconn *conn, char **buffer)
{
int ret;
retry:
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
if (ret == 0)
{
int wc;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
elog(ERROR, "could not get response from pageserver: %s",
PQerrorMessage(conn));
}
goto retry;
}
return ret;
}
static void send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn);
/*
* Fetch all pages of given relation. This simulates a sequential scan
* over the table. You can specify the number of blocks to prefetch;
* the function will try to keep that many requests "in flight" at all
* times. The fetched pages are simply discarded.
*/
Datum
neon_seqscan_rel(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
Oid nprefetch = PG_GETARG_INT32(1);
Relation rel;
char *raw_page_data;
BlockNumber nblocks;
PGconn *pageserver_conn;
XLogRecPtr read_lsn;
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to use raw page functions")));
rel = relation_open(relid, AccessShareLock);
nblocks = RelationGetNumberOfBlocks(rel);
pageserver_conn = PQconnectdb(page_server_connstring);
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
PQfinish(pageserver_conn);
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg("could not establish connection to pageserver"),
errdetail_internal("%s", msg)));
}
PG_TRY();
{
char *query;
int ret;
StringInfoData resp_buff;
read_lsn = GetXLogInsertRecPtr();
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
ret = PQsendQuery(pageserver_conn, query);
if (ret != 1)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
elog(ERROR, "could not send pagestream command to pageserver");
}
while (PQisBusy(pageserver_conn))
{
int wc;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(pageserver_conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
elog(ERROR, "could not complete handshake with pageserver: %s",
msg);
}
}
}
elog(INFO, "scanning %u blocks, prefetch %u", nblocks, nprefetch);
BlockNumber nsent = 0;
for (BlockNumber blkno = 0; blkno < nblocks; blkno++)
{
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = true,
.req.lsn = read_lsn,
.rnode = rel->rd_node,
.forknum = MAIN_FORKNUM,
.blkno = blkno
};
NeonResponse *resp;
if (blkno % 1024 == 0)
elog(INFO, "blk %u/%u", blkno, nblocks);
if (nsent < blkno + nprefetch + 1 && nsent < nblocks)
{
while (nsent < blkno + nprefetch + 1 && nsent < nblocks)
send_getpage_request(pageserver_conn, rel->rd_node, nsent++, read_lsn);
if (PQflush(pageserver_conn))
{
char *msg = PQerrorMessage(pageserver_conn);
elog(ERROR, "failed to flush page requests: %s", msg);
}
}
/* read response */
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
resp_buff.cursor = 0;
if (resp_buff.len < 0)
{
if (resp_buff.len == -1)
elog(ERROR, "end of COPY");
else if (resp_buff.len == -2)
elog(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
}
resp = nm_unpack_response(&resp_buff);
switch (resp->tag)
{
case T_NeonGetPageResponse:
/* ok */
break;
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not read block %u", blkno),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
PQfreemem(resp_buff.data);
}
}
PG_CATCH();
{
PQfinish(pageserver_conn);
PG_RE_THROW();
}
PG_END_TRY();
relation_close(rel, AccessShareLock);
}
static void
send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn)
{
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = true,
.req.lsn = lsn,
.rnode = rnode,
.forknum = MAIN_FORKNUM,
.blkno = blkno
};
StringInfoData req_buff;
req_buff = nm_pack_request(&request.req);
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output and
* TCP buffer.
*/
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = PQerrorMessage(pageserver_conn);
elog(ERROR, "failed to send page request: %s", msg);
}
pfree(req_buff.data);
}
/*
* Directly calls XLogFlush(lsn) to flush WAL buffers.
*/

View File

@@ -1,13 +1,14 @@
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
from performance.test_perf_pgbench import get_scales_matrix
# Test gc_cutoff
# Test gc_cuttoff
#
# This test sets fail point at the end of GC, and checks that pageserver
# normally restarts after it. Also, there should be GC ERRORs in the log,
# but the fixture checks the log for any unexpected ERRORs after every
# test anyway, so it doesn't need any special attention here.
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
# This test set fail point after at the end of GC and checks
# that pageserver normally restarts after it
@pytest.mark.parametrize("scale", get_scales_matrix(10))
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, scale: int):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
@@ -17,23 +18,21 @@ def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"gc_period": "10 s",
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
"compaction_period": "5 s",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "1 s",
"compaction_threshold": "3",
"image_creation_threshold": "2",
}
)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
connstr = pg.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
connstr = pg.connstr()
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
pageserver_http.configure_failpoints(("gc-before-save-metadata", "return"))
for i in range(5):
try:
pg_bin.run_capture(["pgbench", "-N", "-c5", "-T100", "-Mprepared", connstr])
pg_bin.run_capture(["pgbench", "-T100", connstr])
except Exception:
env.pageserver.stop()
env.pageserver.start()
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
pageserver_http.configure_failpoints(("gc-before-save-metadata", "return"))