mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-19 11:22:56 +00:00
Compare commits
24 Commits
docker-mul
...
chunk_load
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c28b6573b4 | ||
|
|
cb70e63f34 | ||
|
|
e6c82c9609 | ||
|
|
79ade52535 | ||
|
|
5c33095918 | ||
|
|
5e0f39cc9e | ||
|
|
0a34a592d5 | ||
|
|
19aaa91f6d | ||
|
|
404aab9373 | ||
|
|
bc6db2c10e | ||
|
|
772d853dcf | ||
|
|
ab4d272149 | ||
|
|
f70a5cad61 | ||
|
|
7aba299dbd | ||
|
|
4b3b19f444 | ||
|
|
8ab4c8a050 | ||
|
|
7c4a653230 | ||
|
|
a3cd8f0e6d | ||
|
|
65c851a451 | ||
|
|
23cf2fa984 | ||
|
|
ce8d6ae958 | ||
|
|
384b2a91fa | ||
|
|
233c4811db | ||
|
|
2fd4c390cb |
@@ -443,27 +443,25 @@ jobs:
|
||||
- checkout
|
||||
- setup_remote_docker:
|
||||
docker_layer_caching: true
|
||||
- run:
|
||||
name: Login to docker hub
|
||||
command: echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
|
||||
- run:
|
||||
name: Setup buildx
|
||||
command: docker run -it --rm --privileged tonistiigi/binfmt --install all
|
||||
# Build zenithdb/compute-tools:latest image and push it to Docker hub
|
||||
# TODO: this should probably also use versioned tag, not just :latest.
|
||||
# XXX: but should it? We build and use it only locally now.
|
||||
- run:
|
||||
name: Build and push compute-tools Docker image
|
||||
command: docker buildx build --platform linux/amd64,linux/arm64 --push -t zenithdb/compute-tools:latest compute_tools
|
||||
command: |
|
||||
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
|
||||
docker build -t zenithdb/compute-tools:latest ./compute_tools/
|
||||
docker push zenithdb/compute-tools:latest
|
||||
- run:
|
||||
name: Init postgres submodule
|
||||
command: git submodule update --init --depth 1
|
||||
- run:
|
||||
name: Build and push compute-node Docker image
|
||||
command: |
|
||||
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
|
||||
DOCKER_TAG=$(git log --oneline|wc -l)
|
||||
docker buildx build --platform linux/amd64,linux/arm64 --push -t zenithdb/compute-node:latest vendor/postgres
|
||||
docker buildx build --platform linux/amd64,linux/arm64 --push -t zenithdb/compute-node:${DOCKER_TAG} vendor/postgres
|
||||
docker build -t zenithdb/compute-node:latest vendor/postgres && docker push zenithdb/compute-node:latest
|
||||
docker tag zenithdb/compute-node:latest zenithdb/compute-node:${DOCKER_TAG} && docker push zenithdb/compute-node:${DOCKER_TAG}
|
||||
|
||||
deploy-staging:
|
||||
docker:
|
||||
@@ -690,11 +688,10 @@ workflows:
|
||||
branches:
|
||||
only:
|
||||
- main
|
||||
- docker-multi-platform
|
||||
# requires:
|
||||
# - pg_regress-tests-release
|
||||
# - other-tests-release
|
||||
# - compute-tools-test
|
||||
requires:
|
||||
- pg_regress-tests-release
|
||||
- other-tests-release
|
||||
- compute-tools-test
|
||||
- deploy-staging:
|
||||
# Context gives an ability to login
|
||||
context: Docker Hub
|
||||
|
||||
44
.github/workflows/docker-builder.yml
vendored
44
.github/workflows/docker-builder.yml
vendored
@@ -1,44 +0,0 @@
|
||||
## Build docker image zenithdb/build:buster for linux/adm64 and linux/arm64 platforms
|
||||
|
||||
name: docker-builder
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- 'docker-multi-platform'
|
||||
schedule:
|
||||
# * is a special character in YAML so you have to quote this string
|
||||
# buil daily at 5:30am
|
||||
- cron: '30 5 * * *'
|
||||
|
||||
jobs:
|
||||
docker-builder-buster:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
-
|
||||
name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: false
|
||||
-
|
||||
name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v1
|
||||
-
|
||||
name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v1
|
||||
-
|
||||
name: Login to DockerHub
|
||||
uses: docker/login-action@v1
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
-
|
||||
name: Build and push zenithdb/build:buster
|
||||
uses: docker/build-push-action@v2
|
||||
with:
|
||||
push: true
|
||||
file: Dockerfile.build
|
||||
platforms: linux/amd64,linux/arm64
|
||||
cache-from: type=registry,ref=zenithdb/build:buster
|
||||
tags: zenithdb/build:buster
|
||||
|
||||
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -1424,16 +1424,19 @@ dependencies = [
|
||||
"bytes",
|
||||
"clap",
|
||||
"hex",
|
||||
"hyper",
|
||||
"lazy_static",
|
||||
"md5",
|
||||
"parking_lot",
|
||||
"rand",
|
||||
"reqwest",
|
||||
"routerify",
|
||||
"rustls 0.19.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"zenith_metrics",
|
||||
"zenith_utils",
|
||||
]
|
||||
|
||||
|
||||
@@ -294,6 +294,7 @@ impl PostgresNode {
|
||||
conf.append("max_replication_slots", "10");
|
||||
conf.append("hot_standby", "on");
|
||||
conf.append("shared_buffers", "1MB");
|
||||
conf.append("zenith.file_cache_size", "4096");
|
||||
conf.append("fsync", "off");
|
||||
conf.append("max_connections", "100");
|
||||
conf.append("wal_level", "replica");
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::process::Command;
|
||||
|
||||
pub mod compute;
|
||||
pub mod local_env;
|
||||
@@ -31,3 +32,19 @@ pub fn read_pidfile(pidfile: &Path) -> Result<i32> {
|
||||
}
|
||||
Ok(pid)
|
||||
}
|
||||
|
||||
fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
|
||||
let cmd = cmd.env_clear().env("RUST_BACKTRACE", "1");
|
||||
|
||||
let var = "LLVM_PROFILE_FILE";
|
||||
if let Some(val) = std::env::var_os(var) {
|
||||
cmd.env(var, val);
|
||||
}
|
||||
|
||||
const RUST_LOG_KEY: &str = "RUST_LOG";
|
||||
if let Ok(rust_log_value) = std::env::var(RUST_LOG_KEY) {
|
||||
cmd.env(RUST_LOG_KEY, rust_log_value)
|
||||
} else {
|
||||
cmd
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,8 +17,8 @@ use thiserror::Error;
|
||||
use zenith_utils::http::error::HttpErrorBody;
|
||||
|
||||
use crate::local_env::{LocalEnv, SafekeeperConf};
|
||||
use crate::read_pidfile;
|
||||
use crate::storage::PageServerNode;
|
||||
use crate::{fill_rust_env_vars, read_pidfile};
|
||||
use zenith_utils::connstring::connection_address;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
@@ -118,22 +118,17 @@ impl SafekeeperNode {
|
||||
let listen_http = format!("localhost:{}", self.conf.http_port);
|
||||
|
||||
let mut cmd = Command::new(self.env.safekeeper_bin()?);
|
||||
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
|
||||
.args(&["--listen-pg", &listen_pg])
|
||||
.args(&["--listen-http", &listen_http])
|
||||
.args(&["--recall", "1 second"])
|
||||
.arg("--daemonize")
|
||||
.env_clear()
|
||||
.env("RUST_BACKTRACE", "1");
|
||||
fill_rust_env_vars(
|
||||
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
|
||||
.args(&["--listen-pg", &listen_pg])
|
||||
.args(&["--listen-http", &listen_http])
|
||||
.args(&["--recall", "1 second"])
|
||||
.arg("--daemonize"),
|
||||
);
|
||||
if !self.conf.sync {
|
||||
cmd.arg("--no-sync");
|
||||
}
|
||||
|
||||
let var = "LLVM_PROFILE_FILE";
|
||||
if let Some(val) = std::env::var_os(var) {
|
||||
cmd.env(var, val);
|
||||
}
|
||||
|
||||
if !cmd.status()?.success() {
|
||||
bail!(
|
||||
"Safekeeper failed to start. See '{}' for details.",
|
||||
|
||||
@@ -19,7 +19,7 @@ use zenith_utils::postgres_backend::AuthType;
|
||||
use zenith_utils::zid::ZTenantId;
|
||||
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::read_pidfile;
|
||||
use crate::{fill_rust_env_vars, read_pidfile};
|
||||
use pageserver::branches::BranchInfo;
|
||||
use pageserver::tenant_mgr::TenantInfo;
|
||||
use zenith_utils::connstring::connection_address;
|
||||
@@ -96,46 +96,49 @@ impl PageServerNode {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn init(&self, create_tenant: Option<&str>) -> anyhow::Result<()> {
|
||||
pub fn init(
|
||||
&self,
|
||||
create_tenant: Option<&str>,
|
||||
config_overrides: &[&str],
|
||||
) -> anyhow::Result<()> {
|
||||
let mut cmd = Command::new(self.env.pageserver_bin()?);
|
||||
let var = "LLVM_PROFILE_FILE";
|
||||
if let Some(val) = std::env::var_os(var) {
|
||||
cmd.env(var, val);
|
||||
}
|
||||
|
||||
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
|
||||
let mut args = vec![
|
||||
"--init".to_string(),
|
||||
"-D".to_string(),
|
||||
self.env.base_data_dir.display().to_string(),
|
||||
"-c".to_string(),
|
||||
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display()),
|
||||
"-c".to_string(),
|
||||
format!("auth_type='{}'", self.env.pageserver.auth_type),
|
||||
"-c".to_string(),
|
||||
format!(
|
||||
"listen_http_addr='{}'",
|
||||
self.env.pageserver.listen_http_addr
|
||||
),
|
||||
"-c".to_string(),
|
||||
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr),
|
||||
];
|
||||
let base_data_dir_param = self.env.base_data_dir.display().to_string();
|
||||
let pg_distrib_dir_param =
|
||||
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display());
|
||||
let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type);
|
||||
let listen_http_addr_param = format!(
|
||||
"listen_http_addr='{}'",
|
||||
self.env.pageserver.listen_http_addr
|
||||
);
|
||||
let listen_pg_addr_param =
|
||||
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr);
|
||||
let mut args = Vec::with_capacity(20);
|
||||
|
||||
args.push("--init");
|
||||
args.extend(["-D", &base_data_dir_param]);
|
||||
args.extend(["-c", &pg_distrib_dir_param]);
|
||||
args.extend(["-c", &authg_type_param]);
|
||||
args.extend(["-c", &listen_http_addr_param]);
|
||||
args.extend(["-c", &listen_pg_addr_param]);
|
||||
|
||||
for config_override in config_overrides {
|
||||
args.extend(["-c", config_override]);
|
||||
}
|
||||
|
||||
if self.env.pageserver.auth_type != AuthType::Trust {
|
||||
args.extend([
|
||||
"-c".to_string(),
|
||||
"auth_validation_public_key_path='auth_public_key.pem'".to_string(),
|
||||
"-c",
|
||||
"auth_validation_public_key_path='auth_public_key.pem'",
|
||||
]);
|
||||
}
|
||||
|
||||
if let Some(tenantid) = create_tenant {
|
||||
args.extend(["--create-tenant".to_string(), tenantid.to_string()])
|
||||
args.extend(["--create-tenant", tenantid])
|
||||
}
|
||||
|
||||
let status = cmd
|
||||
.args(args)
|
||||
.env_clear()
|
||||
.env("RUST_BACKTRACE", "1")
|
||||
let status = fill_rust_env_vars(cmd.args(args))
|
||||
.status()
|
||||
.expect("pageserver init failed");
|
||||
|
||||
@@ -154,7 +157,7 @@ impl PageServerNode {
|
||||
self.repo_path().join("pageserver.pid")
|
||||
}
|
||||
|
||||
pub fn start(&self) -> anyhow::Result<()> {
|
||||
pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
|
||||
print!(
|
||||
"Starting pageserver at '{}' in '{}'",
|
||||
connection_address(&self.pg_connection_config),
|
||||
@@ -163,16 +166,16 @@ impl PageServerNode {
|
||||
io::stdout().flush().unwrap();
|
||||
|
||||
let mut cmd = Command::new(self.env.pageserver_bin()?);
|
||||
cmd.args(&["-D", self.repo_path().to_str().unwrap()])
|
||||
.arg("--daemonize")
|
||||
.env_clear()
|
||||
.env("RUST_BACKTRACE", "1");
|
||||
|
||||
let var = "LLVM_PROFILE_FILE";
|
||||
if let Some(val) = std::env::var_os(var) {
|
||||
cmd.env(var, val);
|
||||
let repo_path = self.repo_path();
|
||||
let mut args = vec!["-D", repo_path.to_str().unwrap()];
|
||||
|
||||
for config_override in config_overrides {
|
||||
args.extend(["-c", config_override]);
|
||||
}
|
||||
|
||||
fill_rust_env_vars(cmd.args(&args).arg("--daemonize"));
|
||||
|
||||
if !cmd.status()?.success() {
|
||||
bail!(
|
||||
"Pageserver failed to start. See '{}' for details.",
|
||||
|
||||
@@ -147,6 +147,10 @@ bucket_name = 'some-sample-bucket'
|
||||
# Name of the region where the bucket is located at
|
||||
bucket_region = 'eu-north-1'
|
||||
|
||||
# A "subfolder" in the bucket, to use the same bucket separately by multiple pageservers at once.
|
||||
# Optional, pageserver uses entire bucket if the prefix is not specified.
|
||||
prefix_in_bucket = '/some/prefix/'
|
||||
|
||||
# Access key to connect to the bucket ("login" part of the credentials)
|
||||
access_key_id = 'SOMEKEYAAAAASADSAH*#'
|
||||
|
||||
|
||||
@@ -129,13 +129,13 @@ There are the following implementations present:
|
||||
* local filesystem — to use in tests mainly
|
||||
* AWS S3 - to use in production
|
||||
|
||||
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs.
|
||||
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs, parameters documentation can be found at [settings docs](../docs/settings.md).
|
||||
|
||||
The backup service is disabled by default and can be enabled to interact with a single remote storage.
|
||||
|
||||
CLI examples:
|
||||
* Local FS: `${PAGESERVER_BIN} -c "remote_storage={local_path='/some/local/path/'}"`
|
||||
* AWS S3 : `${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1',access_key_id='SOMEKEYAAAAASADSAH*#',secret_access_key='SOMEsEcReTsd292v'}"`
|
||||
* AWS S3 : `${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/',access_key_id='SOMEKEYAAAAASADSAH*#',secret_access_key='SOMEsEcReTsd292v'}"`
|
||||
|
||||
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
|
||||
For local S3 installations, refer to the their documentation for name format and credentials.
|
||||
@@ -154,6 +154,7 @@ or
|
||||
[remote_storage]
|
||||
bucket_name = 'some-sample-bucket'
|
||||
bucket_region = 'eu-north-1'
|
||||
prefix_in_bucket = '/test_prefix/'
|
||||
access_key_id = 'SOMEKEYAAAAASADSAH*#'
|
||||
secret_access_key = 'SOMEsEcReTsd292v'
|
||||
```
|
||||
|
||||
@@ -53,12 +53,12 @@ fn main() -> Result<()> {
|
||||
)
|
||||
// See `settings.md` for more details on the extra configuration patameters pageserver can process
|
||||
.arg(
|
||||
Arg::with_name("config-option")
|
||||
Arg::with_name("config-override")
|
||||
.short("c")
|
||||
.takes_value(true)
|
||||
.number_of_values(1)
|
||||
.multiple(true)
|
||||
.help("Additional configuration options or overrides of the ones from the toml config file.
|
||||
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
|
||||
Any option has to be a valid toml document, example: `-c \"foo='hey'\"` `-c \"foo={value=1}\"`"),
|
||||
)
|
||||
.get_matches();
|
||||
@@ -105,7 +105,7 @@ fn main() -> Result<()> {
|
||||
};
|
||||
|
||||
// Process any extra options given with -c
|
||||
if let Some(values) = arg_matches.values_of("config-option") {
|
||||
if let Some(values) = arg_matches.values_of("config-override") {
|
||||
for option_line in values {
|
||||
let doc = toml_edit::Document::from_str(option_line).with_context(|| {
|
||||
format!(
|
||||
@@ -195,9 +195,10 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
}
|
||||
|
||||
let signals = signals::install_shutdown_handlers()?;
|
||||
let (async_shutdown_tx, async_shutdown_rx) = tokio::sync::watch::channel(());
|
||||
let mut threads = Vec::new();
|
||||
|
||||
let sync_startup = remote_storage::start_local_timeline_sync(conf)
|
||||
let sync_startup = remote_storage::start_local_timeline_sync(conf, async_shutdown_rx)
|
||||
.context("Failed to set up local files sync with external storage")?;
|
||||
|
||||
if let Some(handle) = sync_startup.sync_loop_handle {
|
||||
@@ -255,6 +256,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
signal.name()
|
||||
);
|
||||
|
||||
async_shutdown_tx.send(())?;
|
||||
postgres_backend::set_pgbackend_shutdown_requested();
|
||||
tenant_mgr::shutdown_all_tenants()?;
|
||||
endpoint::shutdown();
|
||||
|
||||
@@ -45,14 +45,16 @@ impl BranchInfo {
|
||||
repo: &Arc<dyn Repository>,
|
||||
include_non_incremental_logical_size: bool,
|
||||
) -> Result<Self> {
|
||||
let name = path
|
||||
.as_ref()
|
||||
.file_name()
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let timeline_id = std::fs::read_to_string(path)?.parse::<ZTimelineId>()?;
|
||||
let path = path.as_ref();
|
||||
let name = path.file_name().unwrap().to_string_lossy().to_string();
|
||||
let timeline_id = std::fs::read_to_string(path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read branch file contents at path '{}'",
|
||||
path.display()
|
||||
)
|
||||
})?
|
||||
.parse::<ZTimelineId>()?;
|
||||
|
||||
let timeline = match repo.get_timeline(timeline_id)? {
|
||||
RepositoryTimeline::Local(local_entry) => local_entry,
|
||||
|
||||
@@ -43,6 +43,9 @@ pub mod defaults {
|
||||
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
|
||||
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
|
||||
|
||||
pub const DEFAULT_MAX_DELTA_LAYERS: usize = 10;
|
||||
pub const DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD: usize = 50;
|
||||
|
||||
///
|
||||
/// Default built-in configuration file.
|
||||
///
|
||||
@@ -90,6 +93,21 @@ pub struct PageServerConf {
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
|
||||
//
|
||||
// Minimal total size of delta layeres which triggers generation of image layer by checkpointer.
|
||||
// It is specified as percent of maximal sigment size (RELISH_SEG_SIZE).
|
||||
// I.e. it means that checkpoint will create image layer in addition to delta layer only when total size
|
||||
// of delta layers since last image layer exceeds specified percent of segment size.
|
||||
//
|
||||
pub image_layer_generation_threshold: usize,
|
||||
|
||||
//
|
||||
// Maximal number of delta layers which can be stored before image layere should be generated.
|
||||
// The garbage collector needs image layers in order to delete files.
|
||||
// If this number is too large it can result in too many small files on disk.
|
||||
//
|
||||
pub max_delta_layers: usize,
|
||||
|
||||
// Repository directory, relative to current working directory.
|
||||
// Normally, the page server changes the current working directory
|
||||
// to the repository, and 'workdir' is always '.'. But we don't do
|
||||
@@ -135,6 +153,8 @@ pub struct S3Config {
|
||||
pub bucket_name: String,
|
||||
/// The region where the bucket is located at.
|
||||
pub bucket_region: String,
|
||||
/// A "subfolder" in the bucket, to use the same bucket separately by multiple pageservers at once.
|
||||
pub prefix_in_bucket: Option<String>,
|
||||
/// "Login" to use when connecting to bucket.
|
||||
/// Can be empty for cases like AWS k8s IAM
|
||||
/// where we can allow certain pods to connect
|
||||
@@ -149,6 +169,7 @@ impl std::fmt::Debug for S3Config {
|
||||
f.debug_struct("S3Config")
|
||||
.field("bucket_name", &self.bucket_name)
|
||||
.field("bucket_region", &self.bucket_region)
|
||||
.field("prefix_in_bucket", &self.prefix_in_bucket)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -225,6 +246,9 @@ impl PageServerConf {
|
||||
page_cache_size: DEFAULT_PAGE_CACHE_SIZE,
|
||||
max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
|
||||
max_delta_layers: DEFAULT_MAX_DELTA_LAYERS,
|
||||
image_layer_generation_threshold: DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
|
||||
|
||||
pg_distrib_dir: PathBuf::new(),
|
||||
auth_validation_public_key_path: None,
|
||||
auth_type: AuthType::Trust,
|
||||
@@ -247,6 +271,10 @@ impl PageServerConf {
|
||||
"max_file_descriptors" => {
|
||||
conf.max_file_descriptors = parse_toml_u64(key, item)? as usize
|
||||
}
|
||||
"max_delta_layers" => conf.max_delta_layers = parse_toml_u64(key, item)? as usize,
|
||||
"image_layer_generation_threshold" => {
|
||||
conf.image_layer_generation_threshold = parse_toml_u64(key, item)? as usize
|
||||
}
|
||||
"pg_distrib_dir" => {
|
||||
conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?)
|
||||
}
|
||||
@@ -332,18 +360,26 @@ impl PageServerConf {
|
||||
bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
|
||||
}
|
||||
(None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config {
|
||||
bucket_name: bucket_name.as_str().unwrap().to_string(),
|
||||
bucket_region: bucket_region.as_str().unwrap().to_string(),
|
||||
bucket_name: parse_toml_string("bucket_name", bucket_name)?,
|
||||
bucket_region: parse_toml_string("bucket_region", bucket_region)?,
|
||||
access_key_id: toml
|
||||
.get("access_key_id")
|
||||
.map(|x| x.as_str().unwrap().to_string()),
|
||||
.map(|access_key_id| parse_toml_string("access_key_id", access_key_id))
|
||||
.transpose()?,
|
||||
secret_access_key: toml
|
||||
.get("secret_access_key")
|
||||
.map(|x| x.as_str().unwrap().to_string()),
|
||||
.map(|secret_access_key| {
|
||||
parse_toml_string("secret_access_key", secret_access_key)
|
||||
})
|
||||
.transpose()?,
|
||||
prefix_in_bucket: toml
|
||||
.get("prefix_in_bucket")
|
||||
.map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket))
|
||||
.transpose()?,
|
||||
}),
|
||||
(Some(local_path), None, None) => {
|
||||
RemoteStorageKind::LocalFs(PathBuf::from(local_path.as_str().unwrap()))
|
||||
}
|
||||
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
|
||||
parse_toml_string("local_path", local_path)?,
|
||||
)),
|
||||
(Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"),
|
||||
};
|
||||
|
||||
@@ -368,6 +404,8 @@ impl PageServerConf {
|
||||
gc_period: Duration::from_secs(10),
|
||||
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
||||
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
|
||||
image_layer_generation_threshold: defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
superuser: "zenith_admin".to_string(),
|
||||
@@ -439,6 +477,9 @@ gc_horizon = 222
|
||||
page_cache_size = 444
|
||||
max_file_descriptors = 333
|
||||
|
||||
max_delta_layers = 10
|
||||
image_layer_generation_threshold = 50
|
||||
|
||||
# initial superuser role name to use when creating a new tenant
|
||||
initial_superuser_name = 'zzzz'
|
||||
|
||||
@@ -469,6 +510,9 @@ initial_superuser_name = 'zzzz'
|
||||
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
|
||||
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
||||
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
|
||||
image_layer_generation_threshold:
|
||||
defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
|
||||
workdir,
|
||||
pg_distrib_dir,
|
||||
auth_type: AuthType::Trust,
|
||||
@@ -510,6 +554,8 @@ initial_superuser_name = 'zzzz'
|
||||
superuser: "zzzz".to_string(),
|
||||
page_cache_size: 444,
|
||||
max_file_descriptors: 333,
|
||||
max_delta_layers: 10,
|
||||
image_layer_generation_threshold: 50,
|
||||
workdir,
|
||||
pg_distrib_dir,
|
||||
auth_type: AuthType::Trust,
|
||||
@@ -585,6 +631,7 @@ pg_distrib_dir='{}'
|
||||
|
||||
let bucket_name = "some-sample-bucket".to_string();
|
||||
let bucket_region = "eu-north-1".to_string();
|
||||
let prefix_in_bucket = "test_prefix".to_string();
|
||||
let access_key_id = "SOMEKEYAAAAASADSAH*#".to_string();
|
||||
let secret_access_key = "SOMEsEcReTsd292v".to_string();
|
||||
let max_concurrent_sync = NonZeroUsize::new(111).unwrap();
|
||||
@@ -597,13 +644,14 @@ max_concurrent_sync = {}
|
||||
max_sync_errors = {}
|
||||
bucket_name = '{}'
|
||||
bucket_region = '{}'
|
||||
prefix_in_bucket = '{}'
|
||||
access_key_id = '{}'
|
||||
secret_access_key = '{}'"#,
|
||||
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, access_key_id, secret_access_key
|
||||
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key
|
||||
),
|
||||
format!(
|
||||
"remote_storage={{max_concurrent_sync = {}, max_sync_errors = {}, bucket_name='{}', bucket_region='{}', access_key_id='{}', secret_access_key='{}'}}",
|
||||
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, access_key_id, secret_access_key
|
||||
"remote_storage={{max_concurrent_sync={}, max_sync_errors={}, bucket_name='{}', bucket_region='{}', prefix_in_bucket='{}', access_key_id='{}', secret_access_key='{}'}}",
|
||||
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key
|
||||
),
|
||||
];
|
||||
|
||||
@@ -637,6 +685,7 @@ pg_distrib_dir='{}'
|
||||
bucket_region: bucket_region.clone(),
|
||||
access_key_id: Some(access_key_id.clone()),
|
||||
secret_access_key: Some(secret_access_key.clone()),
|
||||
prefix_in_bucket: Some(prefix_in_bucket.clone())
|
||||
}),
|
||||
},
|
||||
"Remote storage config should correctly parse the S3 config"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use anyhow::{Context, Result};
|
||||
use hyper::header;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
@@ -190,18 +190,27 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct TimelineInfo {
|
||||
#[serde(with = "hex")]
|
||||
timeline_id: ZTimelineId,
|
||||
#[serde(with = "hex")]
|
||||
tenant_id: ZTenantId,
|
||||
#[serde(with = "opt_display_serde")]
|
||||
ancestor_timeline_id: Option<ZTimelineId>,
|
||||
last_record_lsn: Lsn,
|
||||
prev_record_lsn: Lsn,
|
||||
start_lsn: Lsn,
|
||||
disk_consistent_lsn: Lsn,
|
||||
timeline_state: Option<TimelineSyncState>,
|
||||
#[serde(tag = "type")]
|
||||
enum TimelineInfo {
|
||||
Local {
|
||||
#[serde(with = "hex")]
|
||||
timeline_id: ZTimelineId,
|
||||
#[serde(with = "hex")]
|
||||
tenant_id: ZTenantId,
|
||||
#[serde(with = "opt_display_serde")]
|
||||
ancestor_timeline_id: Option<ZTimelineId>,
|
||||
last_record_lsn: Lsn,
|
||||
prev_record_lsn: Lsn,
|
||||
start_lsn: Lsn,
|
||||
disk_consistent_lsn: Lsn,
|
||||
timeline_state: Option<TimelineSyncState>,
|
||||
},
|
||||
Remote {
|
||||
#[serde(with = "hex")]
|
||||
timeline_id: ZTimelineId,
|
||||
#[serde(with = "hex")]
|
||||
tenant_id: ZTenantId,
|
||||
},
|
||||
}
|
||||
|
||||
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
@@ -215,9 +224,12 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id)
|
||||
.entered();
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
|
||||
match repo.get_timeline(timeline_id)?.local_timeline() {
|
||||
None => bail!("Timeline with id {} is not present locally", timeline_id),
|
||||
Some(timeline) => Ok::<_, anyhow::Error>(TimelineInfo {
|
||||
Ok::<_, anyhow::Error>(match repo.get_timeline(timeline_id)?.local_timeline() {
|
||||
None => TimelineInfo::Remote {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
},
|
||||
Some(timeline) => TimelineInfo::Local {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
|
||||
@@ -226,8 +238,8 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
prev_record_lsn: timeline.get_prev_record_lsn(),
|
||||
start_lsn: timeline.get_start_lsn(),
|
||||
timeline_state: repo.get_timeline_state(timeline_id),
|
||||
}),
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
|
||||
@@ -41,6 +41,7 @@ use crate::repository::{
|
||||
TimelineWriter, ZenithWalRecord,
|
||||
};
|
||||
use crate::tenant_mgr;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walreceiver;
|
||||
use crate::walreceiver::IS_WAL_RECEIVER;
|
||||
use crate::walredo::WalRedoManager;
|
||||
@@ -127,7 +128,13 @@ pub struct LayeredRepository {
|
||||
conf: &'static PageServerConf,
|
||||
tenantid: ZTenantId,
|
||||
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
|
||||
|
||||
// This mutex prevents creation of new timelines during GC.
|
||||
// Adding yet another mutex (in addition to `timelines`) is needed because holding
|
||||
// `timelines` mutex during all GC iteration (especially with enforced checkpoint)
|
||||
// may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
|
||||
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
|
||||
// timeout...
|
||||
gc_cs: Mutex<()>,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
/// Makes every timeline to backup their files to remote storage.
|
||||
upload_relishes: bool,
|
||||
@@ -186,6 +193,8 @@ impl Repository for LayeredRepository {
|
||||
// We need to hold this lock to prevent GC from starting at the same time. GC scans the directory to learn
|
||||
// about timelines, so otherwise a race condition is possible, where we create new timeline and GC
|
||||
// concurrently removes data that is needed by the new timeline.
|
||||
let _gc_cs = self.gc_cs.lock().unwrap();
|
||||
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let src_timeline = match self.get_or_init_timeline(src, &mut timelines)? {
|
||||
LayeredTimelineEntry::Local(timeline) => timeline,
|
||||
@@ -359,7 +368,7 @@ fn shutdown_timeline(
|
||||
timeline
|
||||
.upload_relishes
|
||||
.store(false, atomic::Ordering::Relaxed);
|
||||
walreceiver::stop_wal_receiver(timeline_id);
|
||||
walreceiver::stop_wal_receiver(tenant_id, timeline_id);
|
||||
trace!("repo shutdown. checkpoint timeline {}", timeline_id);
|
||||
// Do not reconstruct pages to reduce shutdown time
|
||||
timeline.checkpoint(CheckpointConfig::Flush)?;
|
||||
@@ -489,6 +498,7 @@ impl LayeredRepository {
|
||||
tenantid,
|
||||
conf,
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
gc_cs: Mutex::new(()),
|
||||
walredo_mgr,
|
||||
upload_relishes,
|
||||
}
|
||||
@@ -505,10 +515,10 @@ impl LayeredRepository {
|
||||
let _enter = info_span!("saving metadata").entered();
|
||||
let path = metadata_path(conf, timelineid, tenantid);
|
||||
// use OpenOptions to ensure file presence is consistent with first_save
|
||||
let mut file = OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(first_save)
|
||||
.open(&path)?;
|
||||
let mut file = VirtualFile::open_with_options(
|
||||
&path,
|
||||
OpenOptions::new().write(true).create_new(first_save),
|
||||
)?;
|
||||
|
||||
let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?;
|
||||
|
||||
@@ -575,7 +585,8 @@ impl LayeredRepository {
|
||||
let now = Instant::now();
|
||||
|
||||
// grab mutex to prevent new timelines from being created here.
|
||||
// TODO: We will hold it for a long time
|
||||
let _gc_cs = self.gc_cs.lock().unwrap();
|
||||
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
|
||||
// Scan all timelines. For each timeline, remember the timeline ID and
|
||||
@@ -663,6 +674,7 @@ impl LayeredRepository {
|
||||
}
|
||||
|
||||
if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
|
||||
drop(timelines);
|
||||
let branchpoints: Vec<Lsn> = all_branchpoints
|
||||
.range((
|
||||
Included((timelineid, Lsn(0))),
|
||||
@@ -682,6 +694,7 @@ impl LayeredRepository {
|
||||
let result = timeline.gc_timeline(branchpoints, cutoff)?;
|
||||
|
||||
totals += result;
|
||||
timelines = self.timelines.lock().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -759,6 +772,12 @@ pub struct LayeredTimeline {
|
||||
/// to avoid deadlock.
|
||||
write_lock: Mutex<()>,
|
||||
|
||||
// Prevent concurrent checkpoints.
|
||||
// Checkpoints are normally performed by one thread. But checkpoint can also be manually requested by admin
|
||||
// (that's used in tests), and shutdown also forces a checkpoint. These forced checkpoints run in a different thread
|
||||
// and could be triggered at the same time as a normal checkpoint.
|
||||
checkpoint_cs: Mutex<()>,
|
||||
|
||||
// Needed to ensure that we can't create a branch at a point that was already garbage collected
|
||||
latest_gc_cutoff_lsn: AtomicLsn,
|
||||
|
||||
@@ -1118,6 +1137,7 @@ impl LayeredTimeline {
|
||||
upload_relishes: AtomicBool::new(upload_relishes),
|
||||
|
||||
write_lock: Mutex::new(()),
|
||||
checkpoint_cs: Mutex::new(()),
|
||||
|
||||
latest_gc_cutoff_lsn: AtomicLsn::from(metadata.latest_gc_cutoff_lsn()),
|
||||
initdb_lsn: metadata.initdb_lsn(),
|
||||
@@ -1435,6 +1455,9 @@ impl LayeredTimeline {
|
||||
///
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL.
|
||||
fn checkpoint_internal(&self, checkpoint_distance: u64, reconstruct_pages: bool) -> Result<()> {
|
||||
// Prevent concurrent checkpoints
|
||||
let _checkpoint_cs = self.checkpoint_cs.lock().unwrap();
|
||||
|
||||
let mut write_guard = self.write_lock.lock().unwrap();
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
|
||||
@@ -1575,7 +1598,7 @@ impl LayeredTimeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn evict_layer(&self, layer_id: LayerId, reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
|
||||
fn evict_layer(&self, layer_id: LayerId, mut reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
|
||||
// Mark the layer as no longer accepting writes and record the end_lsn.
|
||||
// This happens in-place, no new layers are created now.
|
||||
// We call `get_last_record_lsn` again, which may be different from the
|
||||
@@ -1588,8 +1611,27 @@ impl LayeredTimeline {
|
||||
|
||||
let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
|
||||
if let Some(oldest_layer) = global_layer_map.get(&layer_id) {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
// Avoid creation of image layers if there are not so much deltas
|
||||
if reconstruct_pages
|
||||
&& oldest_layer.get_seg_tag().rel.is_blocky()
|
||||
&& self.conf.image_layer_generation_threshold != 0
|
||||
{
|
||||
let (n_delta_layers, total_delta_size) =
|
||||
layers.count_delta_layers(oldest_layer.get_seg_tag(), last_lsn)?;
|
||||
let logical_segment_size =
|
||||
oldest_layer.get_seg_size(last_lsn)? as u64 * BLCKSZ as u64;
|
||||
let physical_deltas_size = total_delta_size + oldest_layer.get_physical_size()?;
|
||||
if logical_segment_size * self.conf.image_layer_generation_threshold as u64
|
||||
> physical_deltas_size * 100
|
||||
&& n_delta_layers < self.conf.max_delta_layers
|
||||
{
|
||||
reconstruct_pages = false;
|
||||
}
|
||||
}
|
||||
|
||||
drop(global_layer_map);
|
||||
oldest_layer.freeze(self.get_last_record_lsn());
|
||||
oldest_layer.freeze(last_lsn);
|
||||
|
||||
// The layer is no longer open, update the layer map to reflect this.
|
||||
// We will replace it with on-disk historics below.
|
||||
|
||||
@@ -161,6 +161,14 @@ pub struct DeltaLayerInner {
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
fn get_physical_size(&self) -> Result<u64> {
|
||||
Ok(if let Some(book) = &self.book {
|
||||
book.chapter_reader(PAGE_VERSIONS_CHAPTER)?.len()
|
||||
} else {
|
||||
0
|
||||
})
|
||||
}
|
||||
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
||||
// Scan the VecMap backwards, starting from the given entry.
|
||||
let slice = self
|
||||
@@ -289,6 +297,12 @@ impl Layer for DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Get physical size of the layer
|
||||
fn get_physical_size(&self) -> Result<u64> {
|
||||
// TODO: is it actually necessary to load layer to get it's size?
|
||||
self.load()?.get_physical_size()
|
||||
}
|
||||
|
||||
/// Get size of the relation at given LSN
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
||||
assert!(lsn >= self.start_lsn);
|
||||
|
||||
@@ -41,7 +41,7 @@ pub struct EphemeralFile {
|
||||
_timelineid: ZTimelineId,
|
||||
file: Arc<VirtualFile>,
|
||||
|
||||
pos: u64,
|
||||
pub pos: u64,
|
||||
}
|
||||
|
||||
impl EphemeralFile {
|
||||
|
||||
@@ -201,6 +201,11 @@ impl Layer for ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Get physical size of the layer
|
||||
fn get_physical_size(&self) -> Result<u64> {
|
||||
Ok(self.get_seg_size(Lsn(0))? as u64 * BLOCK_SIZE as u64)
|
||||
}
|
||||
|
||||
/// Does this segment exist at given LSN?
|
||||
fn get_seg_exists(&self, _lsn: Lsn) -> Result<bool> {
|
||||
Ok(true)
|
||||
|
||||
@@ -80,6 +80,10 @@ impl InMemoryLayerInner {
|
||||
assert!(self.end_lsn.is_none());
|
||||
}
|
||||
|
||||
fn get_physical_size(&self) -> u64 {
|
||||
self.page_versions.size()
|
||||
}
|
||||
|
||||
fn get_seg_size(&self, lsn: Lsn) -> SegmentBlk {
|
||||
// Scan the BTreeMap backwards, starting from the given entry.
|
||||
let slice = self.seg_sizes.slice_range(..=lsn);
|
||||
@@ -221,7 +225,12 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get size of the relation at given LSN
|
||||
// Get physical size of the layer
|
||||
fn get_physical_size(&self) -> Result<u64> {
|
||||
Ok(self.inner.read().unwrap().get_physical_size() as u64)
|
||||
}
|
||||
|
||||
/// Get logical size of the relation at given LSN
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
|
||||
assert!(lsn >= self.start_lsn);
|
||||
ensure!(
|
||||
@@ -616,7 +625,7 @@ impl InMemoryLayer {
|
||||
let image_lsn: Option<Lsn>;
|
||||
let delta_end_lsn: Option<Lsn>;
|
||||
if self.is_dropped() || !reconstruct_pages {
|
||||
// The segment was dropped. Create just a delta layer containing all the
|
||||
// Create just a delta layer containing all the
|
||||
// changes up to and including the drop.
|
||||
delta_end_lsn = Some(end_lsn_exclusive);
|
||||
image_lsn = None;
|
||||
|
||||
@@ -111,6 +111,14 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate over all items with start bound <= 'key'
|
||||
pub fn iter_older(&self, key: I::Key) -> IntervalIter<I> {
|
||||
IntervalIter {
|
||||
point_iter: self.points.range(..key),
|
||||
elem_iter: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate over all items
|
||||
pub fn iter(&self) -> IntervalIter<I> {
|
||||
IntervalIter {
|
||||
@@ -230,6 +238,35 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, I> DoubleEndedIterator for IntervalIter<'a, I>
|
||||
where
|
||||
I: IntervalItem + ?Sized,
|
||||
{
|
||||
fn next_back(&mut self) -> Option<Self::Item> {
|
||||
// Iterate over all elements in all the points in 'point_iter'. To avoid
|
||||
// returning the same element twice, we only return each element at its
|
||||
// starting point.
|
||||
loop {
|
||||
// Return next remaining element from the current point
|
||||
if let Some((point_key, elem_iter)) = &mut self.elem_iter {
|
||||
while let Some(elem) = elem_iter.next_back() {
|
||||
if elem.start_key() == *point_key {
|
||||
return Some(Arc::clone(elem));
|
||||
}
|
||||
}
|
||||
}
|
||||
// No more elements at this point. Move to next point.
|
||||
if let Some((point_key, point)) = self.point_iter.next_back() {
|
||||
self.elem_iter = Some((*point_key, point.elements.iter()));
|
||||
continue;
|
||||
} else {
|
||||
// No more points, all done
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: ?Sized> Default for IntervalTree<I>
|
||||
where
|
||||
I: IntervalItem,
|
||||
|
||||
@@ -199,6 +199,14 @@ impl LayerMap {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn count_delta_layers(&self, seg: SegmentTag, lsn: Lsn) -> Result<(usize, u64)> {
|
||||
if let Some(segentry) = self.segs.get(&seg) {
|
||||
segentry.count_delta_layers(lsn)
|
||||
} else {
|
||||
Ok((0, 0))
|
||||
}
|
||||
}
|
||||
|
||||
/// Is there any layer for given segment that is alive at the lsn?
|
||||
///
|
||||
/// This is a public wrapper for SegEntry fucntion,
|
||||
@@ -320,6 +328,22 @@ impl SegEntry {
|
||||
.any(|layer| !layer.is_incremental())
|
||||
}
|
||||
|
||||
// Count number of delta layers preceeding specified `lsn`.
|
||||
// Perform backward iteration from exclusive upper bound until image layer is reached.
|
||||
pub fn count_delta_layers(&self, lsn: Lsn) -> Result<(usize, u64)> {
|
||||
let mut count: usize = 0;
|
||||
let mut total_size: u64 = 0;
|
||||
let mut iter = self.historic.iter_older(lsn);
|
||||
while let Some(layer) = iter.next_back() {
|
||||
if !layer.is_incremental() {
|
||||
break;
|
||||
}
|
||||
count += 1;
|
||||
total_size += layer.get_physical_size()?;
|
||||
}
|
||||
Ok((count, total_size))
|
||||
}
|
||||
|
||||
// Set new open layer for a SegEntry.
|
||||
// It's ok to rewrite previous open layer,
|
||||
// but only if it is not writeable anymore.
|
||||
|
||||
@@ -39,6 +39,10 @@ impl PageVersions {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.file.pos
|
||||
}
|
||||
|
||||
pub fn append_or_update_last(
|
||||
&mut self,
|
||||
blknum: u32,
|
||||
|
||||
@@ -154,12 +154,15 @@ pub trait Layer: Send + Sync {
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<PageReconstructResult>;
|
||||
|
||||
/// Return size of the segment at given LSN. (Only for blocky relations.)
|
||||
/// Return logical size of the segment at given LSN. (Only for blocky relations.)
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk>;
|
||||
|
||||
/// Does the segment exist at given LSN? Or was it dropped before it.
|
||||
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
|
||||
|
||||
// Get physical size of the layer
|
||||
fn get_physical_size(&self) -> Result<u64>;
|
||||
|
||||
/// Does this layer only contain some data for the segment (incremental),
|
||||
/// or does it contain a version of every page? This is important to know
|
||||
/// for garbage collecting old layers: an incremental layer depends on
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use lazy_static::lazy_static;
|
||||
use postgres_ffi::pg_constants::BLCKSZ;
|
||||
use regex::Regex;
|
||||
use std::net::TcpListener;
|
||||
use std::str;
|
||||
@@ -42,6 +43,8 @@ use crate::tenant_mgr;
|
||||
use crate::walreceiver;
|
||||
use crate::CheckpointConfig;
|
||||
|
||||
const CHUNK_SIZE: u32 = 128; // 1Mb
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
enum PagestreamFeMessage {
|
||||
Exists(PagestreamExistsRequest),
|
||||
@@ -91,7 +94,8 @@ struct PagestreamNblocksResponse {
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamGetPageResponse {
|
||||
page: Bytes,
|
||||
n_blocks: u32,
|
||||
data: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -162,7 +166,8 @@ impl PagestreamBeMessage {
|
||||
|
||||
Self::GetPage(resp) => {
|
||||
bytes.put_u8(102); /* tag from pagestore_client.h */
|
||||
bytes.put(&resp.page[..]);
|
||||
bytes.put_u32(resp.n_blocks);
|
||||
bytes.put(&resp.data[..]);
|
||||
}
|
||||
|
||||
Self::Error(resp) => {
|
||||
@@ -438,11 +443,18 @@ impl PageServerHandler {
|
||||
.entered();
|
||||
let tag = RelishTag::Relation(req.rel);
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?;
|
||||
|
||||
let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?;
|
||||
|
||||
let rel_size = timeline.get_relish_size(tag, lsn)?.unwrap_or(0);
|
||||
let blkno = req.blkno;
|
||||
let n_blocks = u32::min(blkno + CHUNK_SIZE, rel_size) - blkno;
|
||||
let mut data = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
|
||||
for i in 0..n_blocks {
|
||||
let page = timeline.get_page_at_lsn(tag, blkno + i, lsn)?;
|
||||
data.extend_from_slice(&page);
|
||||
}
|
||||
assert!(data.len() == n_blocks as usize * BLCKSZ as usize);
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
n_blocks,
|
||||
data: data.freeze(),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -594,7 +606,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
|
||||
.context("Failed to fetch local timeline for callmemaybe requests")?;
|
||||
|
||||
walreceiver::launch_wal_receiver(self.conf, timelineid, &connstr, tenantid.to_owned());
|
||||
walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr);
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("branch_create ") {
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
//! There are a few components the storage machinery consists of:
|
||||
//! * [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
|
||||
//! * [`local_fs`] allows to use local file system as an external storage
|
||||
//! * [`rust_s3`] uses AWS S3 bucket entirely as an external storage
|
||||
//! * [`rust_s3`] uses AWS S3 bucket as an external storage
|
||||
//!
|
||||
//! * synchronization logic at [`storage_sync`] module that keeps pageserver state (both runtime one and the workdir files) and storage state in sync.
|
||||
//! Synchronization internals are split into submodules
|
||||
@@ -93,7 +93,7 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use tokio::io;
|
||||
use tokio::{io, sync};
|
||||
use tracing::{error, info};
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
@@ -135,6 +135,7 @@ pub struct SyncStartupData {
|
||||
/// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states.
|
||||
pub fn start_local_timeline_sync(
|
||||
config: &'static PageServerConf,
|
||||
shutdown_hook: sync::watch::Receiver<()>,
|
||||
) -> anyhow::Result<SyncStartupData> {
|
||||
let local_timeline_files = local_tenant_timeline_files(config)
|
||||
.context("Failed to collect local tenant timeline files")?;
|
||||
@@ -142,6 +143,7 @@ pub fn start_local_timeline_sync(
|
||||
match &config.remote_storage_config {
|
||||
Some(storage_config) => match &storage_config.storage {
|
||||
RemoteStorageKind::LocalFs(root) => storage_sync::spawn_storage_sync_thread(
|
||||
shutdown_hook,
|
||||
config,
|
||||
local_timeline_files,
|
||||
LocalFs::new(root.clone(), &config.workdir)?,
|
||||
@@ -149,6 +151,7 @@ pub fn start_local_timeline_sync(
|
||||
storage_config.max_sync_errors,
|
||||
),
|
||||
RemoteStorageKind::AwsS3(s3_config) => storage_sync::spawn_storage_sync_thread(
|
||||
shutdown_hook,
|
||||
config,
|
||||
local_timeline_files,
|
||||
S3::new(s3_config, &config.workdir)?,
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
//! AWS S3 storage wrapper around `rust_s3` library.
|
||||
//! Currently does not allow multiple pageservers to use the same bucket concurrently: objects are
|
||||
//! placed in the root of the bucket.
|
||||
//!
|
||||
//! Respects `prefix_in_bucket` property from [`S3Config`],
|
||||
//! allowing multiple pageservers to independently work with the same S3 bucket, if
|
||||
//! their bucket prefixes are both specified and different.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
@@ -23,8 +25,26 @@ impl S3ObjectKey {
|
||||
&self.0
|
||||
}
|
||||
|
||||
fn download_destination(&self, pageserver_workdir: &Path) -> PathBuf {
|
||||
pageserver_workdir.join(self.0.split(S3_FILE_SEPARATOR).collect::<PathBuf>())
|
||||
fn download_destination(
|
||||
&self,
|
||||
pageserver_workdir: &Path,
|
||||
prefix_to_strip: Option<&str>,
|
||||
) -> PathBuf {
|
||||
let path_without_prefix = match prefix_to_strip {
|
||||
Some(prefix) => self.0.strip_prefix(prefix).unwrap_or_else(|| {
|
||||
panic!(
|
||||
"Could not strip prefix '{}' from S3 object key '{}'",
|
||||
prefix, self.0
|
||||
)
|
||||
}),
|
||||
None => &self.0,
|
||||
};
|
||||
|
||||
pageserver_workdir.join(
|
||||
path_without_prefix
|
||||
.split(S3_FILE_SEPARATOR)
|
||||
.collect::<PathBuf>(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +52,7 @@ impl S3ObjectKey {
|
||||
pub struct S3 {
|
||||
pageserver_workdir: &'static Path,
|
||||
bucket: Bucket,
|
||||
prefix_in_bucket: Option<String>,
|
||||
}
|
||||
|
||||
impl S3 {
|
||||
@@ -49,6 +70,20 @@ impl S3 {
|
||||
None,
|
||||
)
|
||||
.context("Failed to create the s3 credentials")?;
|
||||
|
||||
let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
|
||||
let mut prefix = prefix;
|
||||
while prefix.starts_with(S3_FILE_SEPARATOR) {
|
||||
prefix = &prefix[1..]
|
||||
}
|
||||
|
||||
let mut prefix = prefix.to_string();
|
||||
while prefix.ends_with(S3_FILE_SEPARATOR) {
|
||||
prefix.pop();
|
||||
}
|
||||
prefix
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
bucket: Bucket::new_with_path_style(
|
||||
aws_config.bucket_name.as_str(),
|
||||
@@ -57,6 +92,7 @@ impl S3 {
|
||||
)
|
||||
.context("Failed to create the s3 bucket")?,
|
||||
pageserver_workdir,
|
||||
prefix_in_bucket,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -67,7 +103,7 @@ impl RemoteStorage for S3 {
|
||||
|
||||
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::StoragePath> {
|
||||
let relative_path = strip_path_prefix(self.pageserver_workdir, local_path)?;
|
||||
let mut key = String::new();
|
||||
let mut key = self.prefix_in_bucket.clone().unwrap_or_default();
|
||||
for segment in relative_path {
|
||||
key.push(S3_FILE_SEPARATOR);
|
||||
key.push_str(&segment.to_string_lossy());
|
||||
@@ -76,13 +112,14 @@ impl RemoteStorage for S3 {
|
||||
}
|
||||
|
||||
fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result<PathBuf> {
|
||||
Ok(storage_path.download_destination(self.pageserver_workdir))
|
||||
Ok(storage_path
|
||||
.download_destination(self.pageserver_workdir, self.prefix_in_bucket.as_deref()))
|
||||
}
|
||||
|
||||
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>> {
|
||||
let list_response = self
|
||||
.bucket
|
||||
.list(String::new(), None)
|
||||
.list(self.prefix_in_bucket.clone().unwrap_or_default(), None)
|
||||
.await
|
||||
.context("Failed to list s3 objects")?;
|
||||
|
||||
@@ -225,7 +262,7 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
local_path,
|
||||
key.download_destination(&repo_harness.conf.workdir),
|
||||
key.download_destination(&repo_harness.conf.workdir, None),
|
||||
"Download destination should consist of s3 path joined with the pageserver workdir prefix"
|
||||
);
|
||||
|
||||
@@ -239,14 +276,18 @@ mod tests {
|
||||
let segment_1 = "matching";
|
||||
let segment_2 = "file";
|
||||
let local_path = &repo_harness.conf.workdir.join(segment_1).join(segment_2);
|
||||
|
||||
let storage = dummy_storage(&repo_harness.conf.workdir);
|
||||
|
||||
let expected_key = S3ObjectKey(format!(
|
||||
"{SEPARATOR}{}{SEPARATOR}{}",
|
||||
"{}{SEPARATOR}{}{SEPARATOR}{}",
|
||||
storage.prefix_in_bucket.as_deref().unwrap_or_default(),
|
||||
segment_1,
|
||||
segment_2,
|
||||
SEPARATOR = S3_FILE_SEPARATOR,
|
||||
));
|
||||
|
||||
let actual_key = dummy_storage(&repo_harness.conf.workdir)
|
||||
let actual_key = storage
|
||||
.storage_path(local_path)
|
||||
.expect("Matching path should map to S3 path normally");
|
||||
assert_eq!(
|
||||
@@ -308,18 +349,30 @@ mod tests {
|
||||
let timeline_dir = repo_harness.timeline_path(&TIMELINE_ID);
|
||||
let relative_timeline_path = timeline_dir.strip_prefix(&repo_harness.conf.workdir)?;
|
||||
|
||||
let s3_key = create_s3_key(&relative_timeline_path.join("not a metadata"));
|
||||
let s3_key = create_s3_key(
|
||||
&relative_timeline_path.join("not a metadata"),
|
||||
storage.prefix_in_bucket.as_deref(),
|
||||
);
|
||||
assert_eq!(
|
||||
s3_key.download_destination(&repo_harness.conf.workdir),
|
||||
s3_key.download_destination(
|
||||
&repo_harness.conf.workdir,
|
||||
storage.prefix_in_bucket.as_deref()
|
||||
),
|
||||
storage
|
||||
.local_path(&s3_key)
|
||||
.expect("For a valid input, valid S3 info should be parsed"),
|
||||
"Should be able to parse metadata out of the correctly named remote delta file"
|
||||
);
|
||||
|
||||
let s3_key = create_s3_key(&relative_timeline_path.join(METADATA_FILE_NAME));
|
||||
let s3_key = create_s3_key(
|
||||
&relative_timeline_path.join(METADATA_FILE_NAME),
|
||||
storage.prefix_in_bucket.as_deref(),
|
||||
);
|
||||
assert_eq!(
|
||||
s3_key.download_destination(&repo_harness.conf.workdir),
|
||||
s3_key.download_destination(
|
||||
&repo_harness.conf.workdir,
|
||||
storage.prefix_in_bucket.as_deref()
|
||||
),
|
||||
storage
|
||||
.local_path(&s3_key)
|
||||
.expect("For a valid input, valid S3 info should be parsed"),
|
||||
@@ -356,18 +409,18 @@ mod tests {
|
||||
Credentials::anonymous().unwrap(),
|
||||
)
|
||||
.unwrap(),
|
||||
prefix_in_bucket: Some("dummy_prefix/".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_s3_key(relative_file_path: &Path) -> S3ObjectKey {
|
||||
S3ObjectKey(
|
||||
relative_file_path
|
||||
.iter()
|
||||
.fold(String::new(), |mut path_string, segment| {
|
||||
path_string.push(S3_FILE_SEPARATOR);
|
||||
path_string.push_str(segment.to_str().unwrap());
|
||||
path_string
|
||||
}),
|
||||
)
|
||||
fn create_s3_key(relative_file_path: &Path, prefix: Option<&str>) -> S3ObjectKey {
|
||||
S3ObjectKey(relative_file_path.iter().fold(
|
||||
prefix.unwrap_or_default().to_string(),
|
||||
|mut path_string, segment| {
|
||||
path_string.push(S3_FILE_SEPARATOR);
|
||||
path_string.push_str(segment.to_str().unwrap());
|
||||
path_string
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,10 +86,15 @@ use std::{
|
||||
use anyhow::{bail, Context};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use lazy_static::lazy_static;
|
||||
use tokio::{fs, sync::RwLock};
|
||||
use tokio::{
|
||||
sync::mpsc::{self, UnboundedReceiver},
|
||||
time::Instant,
|
||||
fs,
|
||||
runtime::Runtime,
|
||||
sync::{
|
||||
mpsc::{self, UnboundedReceiver},
|
||||
watch::Receiver,
|
||||
RwLock,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
@@ -346,6 +351,7 @@ pub(super) fn spawn_storage_sync_thread<
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
|
||||
>(
|
||||
shutdown_hook: Receiver<()>,
|
||||
conf: &'static PageServerConf,
|
||||
local_timeline_files: HashMap<TimelineSyncId, (TimelineMetadata, Vec<PathBuf>)>,
|
||||
storage: S,
|
||||
@@ -384,6 +390,7 @@ pub(super) fn spawn_storage_sync_thread<
|
||||
.spawn(move || {
|
||||
storage_sync_loop(
|
||||
runtime,
|
||||
shutdown_hook,
|
||||
conf,
|
||||
receiver,
|
||||
remote_index,
|
||||
@@ -399,11 +406,18 @@ pub(super) fn spawn_storage_sync_thread<
|
||||
})
|
||||
}
|
||||
|
||||
enum LoopStep {
|
||||
NewStates(HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>>),
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn storage_sync_loop<
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
|
||||
>(
|
||||
runtime: tokio::runtime::Runtime,
|
||||
runtime: Runtime,
|
||||
mut shutdown_hook: Receiver<()>,
|
||||
conf: &'static PageServerConf,
|
||||
mut receiver: UnboundedReceiver<SyncTask>,
|
||||
index: RemoteTimelineIndex,
|
||||
@@ -412,23 +426,34 @@ fn storage_sync_loop<
|
||||
max_sync_errors: NonZeroU32,
|
||||
) -> anyhow::Result<()> {
|
||||
let remote_assets = Arc::new((storage, RwLock::new(index)));
|
||||
while !crate::tenant_mgr::shutdown_requested() {
|
||||
let new_timeline_states = runtime.block_on(
|
||||
loop_step(
|
||||
conf,
|
||||
&mut receiver,
|
||||
Arc::clone(&remote_assets),
|
||||
max_concurrent_sync,
|
||||
max_sync_errors,
|
||||
)
|
||||
.instrument(debug_span!("storage_sync_loop_step")),
|
||||
);
|
||||
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
|
||||
set_timeline_states(conf, new_timeline_states);
|
||||
debug!("Sync loop step completed");
|
||||
loop {
|
||||
let loop_step = runtime.block_on(async {
|
||||
tokio::select! {
|
||||
new_timeline_states = loop_step(
|
||||
conf,
|
||||
&mut receiver,
|
||||
Arc::clone(&remote_assets),
|
||||
max_concurrent_sync,
|
||||
max_sync_errors,
|
||||
)
|
||||
.instrument(debug_span!("storage_sync_loop_step")) => LoopStep::NewStates(new_timeline_states),
|
||||
_ = shutdown_hook.changed() => LoopStep::Shutdown,
|
||||
}
|
||||
});
|
||||
|
||||
match loop_step {
|
||||
LoopStep::NewStates(new_timeline_states) => {
|
||||
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
|
||||
set_timeline_states(conf, new_timeline_states);
|
||||
debug!("Sync loop step completed");
|
||||
}
|
||||
LoopStep::Shutdown => {
|
||||
debug!("Shutdown requested, stopping");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Shutdown requested, stopping");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -539,7 +564,7 @@ async fn process_task<
|
||||
"Waiting {} seconds before starting the task",
|
||||
seconds_to_wait
|
||||
);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs_f64(seconds_to_wait)).await;
|
||||
tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
|
||||
}
|
||||
|
||||
let sync_start = Instant::now();
|
||||
|
||||
@@ -202,7 +202,7 @@ async fn try_download_archive<
|
||||
archive_to_download.disk_consistent_lsn(),
|
||||
local_metadata.disk_consistent_lsn()
|
||||
),
|
||||
Err(e) => warn!("Failed to read local metadata file, assuing it's safe to override its with the download. Read: {:#}", e),
|
||||
Err(e) => warn!("Failed to read local metadata file, assuming it's safe to override its with the download. Read: {:#}", e),
|
||||
}
|
||||
compression::uncompress_file_stream_with_index(
|
||||
conf.timeline_path(&timeline_id, &tenant_id),
|
||||
|
||||
@@ -198,7 +198,7 @@ pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Reposito
|
||||
|
||||
match &tenant.repo {
|
||||
Some(repo) => Ok(Arc::clone(repo)),
|
||||
None => anyhow::bail!("Repository for tenant {} is not yet valid", tenantid),
|
||||
None => bail!("Repository for tenant {} is not yet valid", tenantid),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,15 +10,46 @@
|
||||
//! This is similar to PostgreSQL's virtual file descriptor facility in
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use lazy_static::lazy_static;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{RwLock, RwLockWriteGuard};
|
||||
use zenith_metrics::{register_histogram_vec, register_int_gauge_vec, HistogramVec, IntGaugeVec};
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
// Metrics collected on disk IO operations
|
||||
const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
|
||||
0.000001, // 1 usec
|
||||
0.00001, // 10 usec
|
||||
0.0001, // 100 usec
|
||||
0.001, // 1 msec
|
||||
0.01, // 10 msec
|
||||
0.1, // 100 msec
|
||||
1.0, // 1 sec
|
||||
];
|
||||
|
||||
lazy_static! {
|
||||
static ref STORAGE_IO_TIME: HistogramVec = register_histogram_vec!(
|
||||
"pageserver_io_time",
|
||||
"Time spent in IO operations",
|
||||
&["operation", "tenant_id", "timeline_id"],
|
||||
STORAGE_IO_TIME_BUCKETS.into()
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
lazy_static! {
|
||||
static ref STORAGE_IO_SIZE: IntGaugeVec = register_int_gauge_vec!(
|
||||
"pageserver_io_size",
|
||||
"Amount of bytes",
|
||||
&["operation", "tenant_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
/// the underlying file is closed if the system is low on file descriptors,
|
||||
@@ -51,6 +82,10 @@ pub struct VirtualFile {
|
||||
/// storing it here.
|
||||
pub path: PathBuf,
|
||||
open_options: OpenOptions,
|
||||
|
||||
/// For metrics
|
||||
tenantid: String,
|
||||
timelineid: String,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone, Copy)]
|
||||
@@ -145,7 +180,13 @@ impl OpenFiles {
|
||||
// old file.
|
||||
//
|
||||
if let Some(old_file) = slot_guard.file.take() {
|
||||
drop(old_file);
|
||||
// We do not have information about tenantid/timelineid of evicted file.
|
||||
// It is possible to store path together with file or use filepath crate,
|
||||
// but as far as close() is not expected to be fast, it is not so critical to gather
|
||||
// precise per-tenant statistic here.
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["close", "-", "-"])
|
||||
.observe_closure_duration(|| drop(old_file));
|
||||
}
|
||||
|
||||
// Prepare the slot for reuse and return it
|
||||
@@ -185,9 +226,20 @@ impl VirtualFile {
|
||||
path: &Path,
|
||||
open_options: &OpenOptions,
|
||||
) -> Result<VirtualFile, std::io::Error> {
|
||||
let parts = path.to_str().unwrap().split('/').collect::<Vec<&str>>();
|
||||
let tenantid;
|
||||
let timelineid;
|
||||
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
|
||||
tenantid = parts[parts.len() - 4].to_string();
|
||||
timelineid = parts[parts.len() - 2].to_string();
|
||||
} else {
|
||||
tenantid = "*".to_string();
|
||||
timelineid = "*".to_string();
|
||||
}
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
|
||||
|
||||
let file = open_options.open(path)?;
|
||||
let file = STORAGE_IO_TIME
|
||||
.with_label_values(&["open", &tenantid, &timelineid])
|
||||
.observe_closure_duration(|| open_options.open(path))?;
|
||||
|
||||
// Strip all options other than read and write.
|
||||
//
|
||||
@@ -204,6 +256,8 @@ impl VirtualFile {
|
||||
pos: 0,
|
||||
path: path.to_path_buf(),
|
||||
open_options: reopen_options,
|
||||
tenantid,
|
||||
timelineid,
|
||||
};
|
||||
|
||||
slot_guard.file.replace(file);
|
||||
@@ -213,13 +267,13 @@ impl VirtualFile {
|
||||
|
||||
/// Call File::sync_all() on the underlying File.
|
||||
pub fn sync_all(&self) -> Result<(), Error> {
|
||||
self.with_file(|file| file.sync_all())?
|
||||
self.with_file("fsync", |file| file.sync_all())?
|
||||
}
|
||||
|
||||
/// Helper function that looks up the underlying File for this VirtualFile,
|
||||
/// opening it and evicting some other File if necessary. It calls 'func'
|
||||
/// with the physical File.
|
||||
fn with_file<F, R>(&self, mut func: F) -> Result<R, Error>
|
||||
fn with_file<F, R>(&self, op: &str, mut func: F) -> Result<R, Error>
|
||||
where
|
||||
F: FnMut(&File) -> R,
|
||||
{
|
||||
@@ -242,7 +296,9 @@ impl VirtualFile {
|
||||
if let Some(file) = &slot_guard.file {
|
||||
// Found a cached file descriptor.
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
return Ok(func(file));
|
||||
return Ok(STORAGE_IO_TIME
|
||||
.with_label_values(&[op, &self.tenantid, &self.timelineid])
|
||||
.observe_closure_duration(|| func(file)));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -267,7 +323,9 @@ impl VirtualFile {
|
||||
let (handle, mut slot_guard) = open_files.find_victim_slot();
|
||||
|
||||
// Open the physical file
|
||||
let file = self.open_options.open(&self.path)?;
|
||||
let file = STORAGE_IO_TIME
|
||||
.with_label_values(&["open", &self.tenantid, &self.timelineid])
|
||||
.observe_closure_duration(|| self.open_options.open(&self.path))?;
|
||||
|
||||
// Perform the requested operation on it
|
||||
//
|
||||
@@ -276,7 +334,9 @@ impl VirtualFile {
|
||||
// library RwLock doesn't allow downgrading without releasing the lock,
|
||||
// and that doesn't seem worth the trouble. (parking_lot RwLock would
|
||||
// allow it)
|
||||
let result = func(&file);
|
||||
let result = STORAGE_IO_TIME
|
||||
.with_label_values(&[op, &self.tenantid, &self.timelineid])
|
||||
.observe_closure_duration(|| func(&file));
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
// to point to it.
|
||||
@@ -299,7 +359,13 @@ impl Drop for VirtualFile {
|
||||
let mut slot_guard = slot.inner.write().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
slot.recently_used.store(false, Ordering::Relaxed);
|
||||
slot_guard.file.take();
|
||||
// Unlike files evicted by replacement algorithm, here
|
||||
// we group close time by tenantid/timelineid.
|
||||
// At allows to compare number/time of "normal" file closes
|
||||
// with file eviction.
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["close", &self.tenantid, &self.timelineid])
|
||||
.observe_closure_duration(|| slot_guard.file.take());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -335,7 +401,7 @@ impl Seek for VirtualFile {
|
||||
self.pos = offset;
|
||||
}
|
||||
SeekFrom::End(offset) => {
|
||||
self.pos = self.with_file(|mut file| file.seek(SeekFrom::End(offset)))??
|
||||
self.pos = self.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))??
|
||||
}
|
||||
SeekFrom::Current(offset) => {
|
||||
let pos = self.pos as i128 + offset as i128;
|
||||
@@ -357,11 +423,23 @@ impl Seek for VirtualFile {
|
||||
|
||||
impl FileExt for VirtualFile {
|
||||
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||
self.with_file(|file| file.read_at(buf, offset))?
|
||||
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["read", &self.tenantid, &self.timelineid])
|
||||
.add(size as i64);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
self.with_file(|file| file.write_at(buf, offset))?
|
||||
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["write", &self.tenantid, &self.timelineid])
|
||||
.add(size as i64);
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ struct WalReceiverEntry {
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref WAL_RECEIVERS: Mutex<HashMap<ZTimelineId, WalReceiverEntry>> =
|
||||
static ref WAL_RECEIVERS: Mutex<HashMap<(ZTenantId, ZTimelineId), WalReceiverEntry>> =
|
||||
Mutex::new(HashMap::new());
|
||||
}
|
||||
|
||||
@@ -60,10 +60,10 @@ thread_local! {
|
||||
// In future we can make this more granular and send shutdown signals
|
||||
// per tenant/timeline to cancel inactive walreceivers.
|
||||
// TODO deal with blocking pg connections
|
||||
pub fn stop_wal_receiver(timelineid: ZTimelineId) {
|
||||
pub fn stop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) {
|
||||
let mut receivers = WAL_RECEIVERS.lock();
|
||||
|
||||
if let Some(r) = receivers.get_mut(&timelineid) {
|
||||
if let Some(r) = receivers.get_mut(&(tenantid, timelineid)) {
|
||||
match r.wal_receiver_interrupt_sender.take() {
|
||||
Some(s) => {
|
||||
if s.send(()).is_err() {
|
||||
@@ -84,9 +84,9 @@ pub fn stop_wal_receiver(timelineid: ZTimelineId) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn drop_wal_receiver(timelineid: ZTimelineId, tenantid: ZTenantId) {
|
||||
fn drop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) {
|
||||
let mut receivers = WAL_RECEIVERS.lock();
|
||||
receivers.remove(&timelineid);
|
||||
receivers.remove(&(tenantid, timelineid));
|
||||
|
||||
// Check if it was the last walreceiver of the tenant.
|
||||
// TODO now we store one WalReceiverEntry per timeline,
|
||||
@@ -104,13 +104,13 @@ pub fn drop_wal_receiver(timelineid: ZTimelineId, tenantid: ZTenantId) {
|
||||
// Launch a new WAL receiver, or tell one that's running about change in connection string
|
||||
pub fn launch_wal_receiver(
|
||||
conf: &'static PageServerConf,
|
||||
tenantid: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
wal_producer_connstr: &str,
|
||||
tenantid: ZTenantId,
|
||||
) {
|
||||
let mut receivers = WAL_RECEIVERS.lock();
|
||||
|
||||
match receivers.get_mut(&timelineid) {
|
||||
match receivers.get_mut(&(tenantid, timelineid)) {
|
||||
Some(receiver) => {
|
||||
receiver.wal_producer_connstr = wal_producer_connstr.into();
|
||||
}
|
||||
@@ -121,7 +121,7 @@ pub fn launch_wal_receiver(
|
||||
.name("WAL receiver thread".into())
|
||||
.spawn(move || {
|
||||
IS_WAL_RECEIVER.with(|c| c.set(true));
|
||||
thread_main(conf, timelineid, tenantid, rx);
|
||||
thread_main(conf, tenantid, timelineid, rx);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
@@ -131,7 +131,7 @@ pub fn launch_wal_receiver(
|
||||
wal_receiver_interrupt_sender: Some(tx),
|
||||
tenantid,
|
||||
};
|
||||
receivers.insert(timelineid, receiver);
|
||||
receivers.insert((tenantid, timelineid), receiver);
|
||||
|
||||
// Update tenant state and start tenant threads, if they are not running yet.
|
||||
tenant_mgr::set_tenant_state(tenantid, TenantState::Active).unwrap();
|
||||
@@ -141,11 +141,11 @@ pub fn launch_wal_receiver(
|
||||
}
|
||||
|
||||
// Look up current WAL producer connection string in the hash table
|
||||
fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String {
|
||||
fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> String {
|
||||
let receivers = WAL_RECEIVERS.lock();
|
||||
|
||||
receivers
|
||||
.get(&timelineid)
|
||||
.get(&(tenantid, timelineid))
|
||||
.unwrap()
|
||||
.wal_producer_connstr
|
||||
.clone()
|
||||
@@ -156,15 +156,15 @@ fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String {
|
||||
//
|
||||
fn thread_main(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
interrupt_receiver: oneshot::Receiver<()>,
|
||||
) {
|
||||
let _enter = info_span!("WAL receiver", timeline = %timelineid, tenant = %tenantid).entered();
|
||||
info!("WAL receiver thread started");
|
||||
|
||||
// Look up the current WAL producer address
|
||||
let wal_producer_connstr = get_wal_producer_connstr(timelineid);
|
||||
let wal_producer_connstr = get_wal_producer_connstr(tenantid, timelineid);
|
||||
|
||||
// Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server,
|
||||
// and start streaming WAL from it.
|
||||
@@ -188,7 +188,7 @@ fn thread_main(
|
||||
|
||||
// Drop it from list of active WAL_RECEIVERS
|
||||
// so that next callmemaybe request launched a new thread
|
||||
drop_wal_receiver(timelineid, tenantid);
|
||||
drop_wal_receiver(tenantid, timelineid);
|
||||
}
|
||||
|
||||
fn walreceiver_main(
|
||||
|
||||
@@ -13,6 +13,8 @@ lazy_static = "1.4.0"
|
||||
md5 = "0.7.0"
|
||||
rand = "0.8.3"
|
||||
hex = "0.4.3"
|
||||
hyper = "0.14"
|
||||
routerify = "2"
|
||||
parking_lot = "0.11.2"
|
||||
serde = "1"
|
||||
serde_json = "1"
|
||||
@@ -23,3 +25,4 @@ rustls = "0.19.1"
|
||||
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
|
||||
|
||||
zenith_utils = { path = "../zenith_utils" }
|
||||
zenith_metrics = { path = "../zenith_metrics" }
|
||||
|
||||
15
proxy/src/http.rs
Normal file
15
proxy/src/http.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use routerify::RouterBuilder;
|
||||
|
||||
use zenith_utils::http::endpoint;
|
||||
use zenith_utils::http::error::ApiError;
|
||||
use zenith_utils::http::json::json_response;
|
||||
|
||||
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
Ok(json_response(StatusCode::OK, "")?)
|
||||
}
|
||||
|
||||
pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let router = endpoint::make_router();
|
||||
router.get("/v1/status", status_handler)
|
||||
}
|
||||
@@ -9,15 +9,18 @@ use anyhow::bail;
|
||||
use clap::{App, Arg};
|
||||
use state::{ProxyConfig, ProxyState};
|
||||
use std::thread;
|
||||
use zenith_utils::http::endpoint;
|
||||
use zenith_utils::{tcp_listener, GIT_VERSION};
|
||||
|
||||
mod cplane_api;
|
||||
mod http;
|
||||
mod mgmt;
|
||||
mod proxy;
|
||||
mod state;
|
||||
mod waiters;
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
zenith_metrics::set_common_metrics_prefix("zenith_proxy");
|
||||
let arg_matches = App::new("Zenith proxy/router")
|
||||
.version(GIT_VERSION)
|
||||
.arg(
|
||||
@@ -36,6 +39,14 @@ fn main() -> anyhow::Result<()> {
|
||||
.help("listen for management callback connection on ip:port")
|
||||
.default_value("127.0.0.1:7000"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("http")
|
||||
.short("h")
|
||||
.long("http")
|
||||
.takes_value(true)
|
||||
.help("listen for incoming http connections (metrics, etc) on ip:port")
|
||||
.default_value("127.0.0.1:7001"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("uri")
|
||||
.short("u")
|
||||
@@ -82,6 +93,7 @@ fn main() -> anyhow::Result<()> {
|
||||
let config = ProxyConfig {
|
||||
proxy_address: arg_matches.value_of("proxy").unwrap().parse()?,
|
||||
mgmt_address: arg_matches.value_of("mgmt").unwrap().parse()?,
|
||||
http_address: arg_matches.value_of("http").unwrap().parse()?,
|
||||
redirect_uri: arg_matches.value_of("uri").unwrap().parse()?,
|
||||
auth_endpoint: arg_matches.value_of("auth-endpoint").unwrap().parse()?,
|
||||
ssl_config,
|
||||
@@ -91,6 +103,9 @@ fn main() -> anyhow::Result<()> {
|
||||
println!("Version: {}", GIT_VERSION);
|
||||
|
||||
// Check that we can bind to address before further initialization
|
||||
println!("Starting http on {}", state.conf.http_address);
|
||||
let http_listener = tcp_listener::bind(state.conf.http_address)?;
|
||||
|
||||
println!("Starting proxy on {}", state.conf.proxy_address);
|
||||
let pageserver_listener = tcp_listener::bind(state.conf.proxy_address)?;
|
||||
|
||||
@@ -98,6 +113,12 @@ fn main() -> anyhow::Result<()> {
|
||||
let mgmt_listener = tcp_listener::bind(state.conf.mgmt_address)?;
|
||||
|
||||
let threads = [
|
||||
thread::Builder::new()
|
||||
.name("Http thread".into())
|
||||
.spawn(move || {
|
||||
let router = http::make_router();
|
||||
endpoint::serve_thread_main(router, http_listener)
|
||||
})?,
|
||||
// Spawn a thread to listen for connections. It will spawn further threads
|
||||
// for each connection.
|
||||
thread::Builder::new()
|
||||
|
||||
@@ -10,8 +10,12 @@ pub struct ProxyConfig {
|
||||
/// main entrypoint for users to connect to
|
||||
pub proxy_address: SocketAddr,
|
||||
|
||||
/// http management endpoint. Upon user account creation control plane
|
||||
/// internally used for status and prometheus metrics
|
||||
pub http_address: SocketAddr,
|
||||
|
||||
/// management endpoint. Upon user account creation control plane
|
||||
/// will notify us here, so that we can 'unfreeze' user session.
|
||||
/// TODO It uses postgres protocol over TCP but should be migrated to http.
|
||||
pub mgmt_address: SocketAddr,
|
||||
|
||||
/// send unauthenticated users to this URI
|
||||
|
||||
@@ -47,6 +47,9 @@ Useful environment variables:
|
||||
`TEST_OUTPUT`: Set the directory where test state and test output files
|
||||
should go.
|
||||
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.
|
||||
`ZENITH_PAGESERVER_OVERRIDES`: add a `;`-separated set of configs that will be passed as
|
||||
`--pageserver-config-override=${value}` parameter values when zenith cli is invoked
|
||||
`RUST_LOG`: logging configuration to pass into Zenith CLI
|
||||
|
||||
Let stdout, stderr and `INFO` log messages go to the terminal instead of capturing them:
|
||||
`pytest -s --log-cli-level=INFO ...`
|
||||
|
||||
@@ -5,7 +5,7 @@ import psycopg2.extras
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.utils import print_gc_result
|
||||
from fixtures.zenith_fixtures import ZenithEnv
|
||||
from fixtures.zenith_fixtures import ZenithEnvBuilder
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
@@ -13,10 +13,18 @@ pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
#
|
||||
# Create a couple of branches off the main branch, at a historical point in time.
|
||||
#
|
||||
def test_branch_behind(zenith_simple_env: ZenithEnv):
|
||||
env = zenith_simple_env
|
||||
def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
|
||||
|
||||
# Use safekeeper in this test to avoid a subtle race condition.
|
||||
# Without safekeeper, walreceiver reconnection can stuck
|
||||
# because of IO deadlock.
|
||||
#
|
||||
# See https://github.com/zenithdb/zenith/issues/1068
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
|
||||
# Branch at the point where only 100 rows were inserted
|
||||
env.zenith_cli(["branch", "test_branch_behind", "empty"])
|
||||
env.zenith_cli(["branch", "test_branch_behind", "main"])
|
||||
|
||||
pgmain = env.postgres.create_start('test_branch_behind')
|
||||
log.info("postgres is running on 'test_branch_behind' branch")
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import json
|
||||
from uuid import uuid4
|
||||
from uuid import uuid4, UUID
|
||||
import pytest
|
||||
import psycopg2
|
||||
import requests
|
||||
@@ -96,6 +96,15 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: str):
|
||||
client.tenant_create(tenant_id)
|
||||
assert tenant_id.hex in {t['id'] for t in client.tenant_list()}
|
||||
|
||||
# check its timelines
|
||||
timelines = client.timeline_list(tenant_id)
|
||||
assert len(timelines) > 0
|
||||
for timeline_id_str in timelines:
|
||||
timeline_details = client.timeline_details(tenant_id.hex, timeline_id_str)
|
||||
assert timeline_details['type'] == 'Local'
|
||||
assert timeline_details['tenant_id'] == tenant_id.hex
|
||||
assert timeline_details['timeline_id'] == timeline_id_str
|
||||
|
||||
# create branch
|
||||
branch_name = uuid4().hex
|
||||
client.branch_create(tenant_id, branch_name, "main")
|
||||
|
||||
88
test_runner/batch_others/test_remote_storage.py
Normal file
88
test_runner/batch_others/test_remote_storage.py
Normal file
@@ -0,0 +1,88 @@
|
||||
# It's possible to run any regular test with the local fs remote storage via
|
||||
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/zenith_zzz/'}" pipenv ......
|
||||
|
||||
import tempfile, time, shutil, os
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
from fixtures.zenith_fixtures import ZenithEnvBuilder, LocalFsStorage, check_restored_datadir_content
|
||||
from fixtures.log_helper import log
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
|
||||
#
|
||||
# Tests that a piece of data is backed up and restored correctly:
|
||||
#
|
||||
# 1. Initial pageserver
|
||||
# * starts a pageserver with remote storage, stores specific data in its tables
|
||||
# * triggers a checkpoint (which produces a local data scheduled for backup), gets the corresponding timeline id
|
||||
# * polls the timeline status to ensure it's copied remotely
|
||||
# * stops the pageserver, clears all local directories
|
||||
#
|
||||
# 2. Second pageserver
|
||||
# * starts another pageserver, connected to the same remote storage
|
||||
# * same timeline id is queried for status, triggering timeline's download
|
||||
# * timeline status is polled until it's downloaded
|
||||
# * queries the specific data, ensuring that it matches the one stored before
|
||||
#
|
||||
def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.rust_log_override = 'debug'
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
zenith_env_builder.enable_local_fs_remote_storage()
|
||||
|
||||
data_id = 1
|
||||
data_secret = 'very secret secret'
|
||||
|
||||
##### First start, insert secret data and upload it to the remote storage
|
||||
env = zenith_env_builder.init()
|
||||
pg = env.postgres.create_start()
|
||||
|
||||
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
|
||||
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(f'''
|
||||
CREATE TABLE t1(id int primary key, secret text);
|
||||
INSERT INTO t1 VALUES ({data_id}, '{data_secret}');
|
||||
''')
|
||||
|
||||
# run checkpoint manually to be sure that data landed in remote storage
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
with psconn.cursor() as pscur:
|
||||
pscur.execute(f"do_gc {tenant_id} {timeline_id}")
|
||||
log.info("waiting for upload") # TODO api to check if upload is done
|
||||
time.sleep(2)
|
||||
|
||||
##### Stop the first pageserver instance, erase all its data
|
||||
env.postgres.stop_all()
|
||||
env.pageserver.stop()
|
||||
|
||||
dir_to_clear = Path(env.repo_dir) / 'tenants'
|
||||
shutil.rmtree(dir_to_clear)
|
||||
os.mkdir(dir_to_clear)
|
||||
|
||||
##### Second start, restore the data and ensure it's the same
|
||||
env.pageserver.start()
|
||||
|
||||
log.info("waiting for timeline redownload")
|
||||
client = env.pageserver.http_client()
|
||||
attempts = 0
|
||||
while True:
|
||||
timeline_details = client.timeline_details(tenant_id, timeline_id)
|
||||
assert timeline_details['timeline_id'] == timeline_id
|
||||
assert timeline_details['tenant_id'] == tenant_id
|
||||
if timeline_details['type'] == 'Local':
|
||||
log.info("timeline downloaded, checking its data")
|
||||
break
|
||||
attempts += 1
|
||||
if attempts > 10:
|
||||
raise Exception("timeline redownload failed")
|
||||
log.debug("still waiting")
|
||||
time.sleep(1)
|
||||
|
||||
pg = env.postgres.create_start()
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(f'SELECT secret FROM t1 WHERE id = {data_id};')
|
||||
assert cur.fetchone() == (data_secret, )
|
||||
@@ -66,8 +66,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
row = pscur.fetchone()
|
||||
print_gc_result(row)
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
|
||||
assert row['layer_relfiles_removed'] == 2
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 1
|
||||
assert row['layer_relfiles_removed'] == 0
|
||||
assert row['layer_relfiles_dropped'] == 0
|
||||
|
||||
# Insert two more rows and run GC.
|
||||
@@ -81,7 +81,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
||||
row = pscur.fetchone()
|
||||
print_gc_result(row)
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
|
||||
assert row['layer_relfiles_removed'] == 2
|
||||
assert row['layer_relfiles_removed'] == 0
|
||||
assert row['layer_relfiles_dropped'] == 0
|
||||
|
||||
# Do it again. Should again create two new layer files and remove old ones.
|
||||
@@ -92,8 +92,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
row = pscur.fetchone()
|
||||
print_gc_result(row)
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
|
||||
assert row['layer_relfiles_removed'] == 2
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
|
||||
assert row['layer_relfiles_removed'] == 0
|
||||
assert row['layer_relfiles_dropped'] == 0
|
||||
|
||||
# Run GC again, with no changes in the database. Should not remove anything.
|
||||
@@ -101,7 +101,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
row = pscur.fetchone()
|
||||
print_gc_result(row)
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain
|
||||
assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
|
||||
assert row['layer_relfiles_removed'] == 0
|
||||
assert row['layer_relfiles_dropped'] == 0
|
||||
|
||||
@@ -121,9 +121,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
|
||||
# Each relation fork is counted separately, hence 3.
|
||||
assert row['layer_relfiles_needed_as_tombstone'] == 3
|
||||
|
||||
# The catalog updates also create new layer files of the catalogs, which
|
||||
# are counted as 'removed'
|
||||
assert row['layer_relfiles_removed'] > 0
|
||||
assert row['layer_relfiles_removed'] == 0
|
||||
|
||||
# TODO Change the test to check actual CG of dropped layers.
|
||||
# Each relation fork is counted separately, hence 3.
|
||||
|
||||
@@ -25,7 +25,7 @@ from dataclasses import dataclass
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union
|
||||
from typing_extensions import Literal
|
||||
import pytest
|
||||
|
||||
@@ -342,10 +342,14 @@ class ZenithEnvBuilder:
|
||||
def __init__(self,
|
||||
repo_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
pageserver_remote_storage: Optional[RemoteStorage] = None,
|
||||
num_safekeepers: int = 0,
|
||||
pageserver_auth_enabled: bool = False):
|
||||
pageserver_auth_enabled: bool = False,
|
||||
rust_log_override: Optional[str] = None):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
self.port_distributor = port_distributor
|
||||
self.pageserver_remote_storage = pageserver_remote_storage
|
||||
self.num_safekeepers = num_safekeepers
|
||||
self.pageserver_auth_enabled = pageserver_auth_enabled
|
||||
self.env: Optional[ZenithEnv] = None
|
||||
@@ -356,6 +360,11 @@ class ZenithEnvBuilder:
|
||||
self.env = ZenithEnv(self)
|
||||
return self.env
|
||||
|
||||
def enable_local_fs_remote_storage(self):
|
||||
assert self.pageserver_remote_storage is None, "remote storage is enabled already"
|
||||
self.pageserver_remote_storage = LocalFsStorage(
|
||||
Path(self.repo_dir / 'local_fs_remote_storage'))
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
@@ -404,6 +413,7 @@ class ZenithEnv:
|
||||
"""
|
||||
def __init__(self, config: ZenithEnvBuilder):
|
||||
self.repo_dir = config.repo_dir
|
||||
self.rust_log_override = config.rust_log_override
|
||||
self.port_distributor = config.port_distributor
|
||||
|
||||
self.postgres = PostgresFactory(self)
|
||||
@@ -434,7 +444,9 @@ auth_type = '{pageserver_auth_type}'
|
||||
"""
|
||||
|
||||
# Create a corresponding ZenithPageserver object
|
||||
self.pageserver = ZenithPageserver(self, port=pageserver_port)
|
||||
self.pageserver = ZenithPageserver(self,
|
||||
port=pageserver_port,
|
||||
remote_storage=config.pageserver_remote_storage)
|
||||
|
||||
# Create config and a Safekeeper object for each safekeeper
|
||||
for i in range(1, config.num_safekeepers + 1):
|
||||
@@ -465,6 +477,8 @@ sync = false # Disable fsyncs to make the tests go faster
|
||||
tmp.flush()
|
||||
|
||||
cmd = ['init', f'--config={tmp.name}']
|
||||
append_pageserver_param_overrides(cmd, config.pageserver_remote_storage)
|
||||
|
||||
self.zenith_cli(cmd)
|
||||
|
||||
# Start up the page server and all the safekeepers
|
||||
@@ -509,6 +523,9 @@ sync = false # Disable fsyncs to make the tests go faster
|
||||
env_vars['ZENITH_REPO_DIR'] = str(self.repo_dir)
|
||||
env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir)
|
||||
|
||||
if self.rust_log_override is not None:
|
||||
env_vars['RUST_LOG'] = self.rust_log_override
|
||||
|
||||
# Pass coverage settings
|
||||
var = 'LLVM_PROFILE_FILE'
|
||||
val = os.environ.get(var)
|
||||
@@ -665,6 +682,20 @@ class ZenithPageserverHttpClient(requests.Session):
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def timeline_list(self, tenant_id: uuid.UUID) -> List[str]:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id.hex}")
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, list)
|
||||
return res_json
|
||||
|
||||
def timeline_details(self, tenant_id: str, timeline_id: str) -> Dict[Any, Any]:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id}/{timeline_id}")
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def get_metrics(self) -> str:
|
||||
res = self.get(f"http://localhost:{self.port}/metrics")
|
||||
res.raise_for_status()
|
||||
@@ -677,17 +708,38 @@ class PageserverPort:
|
||||
http: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class LocalFsStorage:
|
||||
root: Path
|
||||
|
||||
|
||||
@dataclass
|
||||
class S3Storage:
|
||||
bucket: str
|
||||
region: str
|
||||
access_key: str
|
||||
secret_key: str
|
||||
|
||||
|
||||
RemoteStorage = Union[LocalFsStorage, S3Storage]
|
||||
|
||||
|
||||
class ZenithPageserver(PgProtocol):
|
||||
"""
|
||||
An object representing a running pageserver.
|
||||
|
||||
Initializes the repository via `zenith init`.
|
||||
"""
|
||||
def __init__(self, env: ZenithEnv, port: PageserverPort, enable_auth=False):
|
||||
def __init__(self,
|
||||
env: ZenithEnv,
|
||||
port: PageserverPort,
|
||||
remote_storage: Optional[RemoteStorage] = None,
|
||||
enable_auth=False):
|
||||
super().__init__(host='localhost', port=port.pg)
|
||||
self.env = env
|
||||
self.running = False
|
||||
self.service_port = port # do not shadow PgProtocol.port which is just int
|
||||
self.remote_storage = remote_storage
|
||||
|
||||
def start(self) -> 'ZenithPageserver':
|
||||
"""
|
||||
@@ -696,7 +748,10 @@ class ZenithPageserver(PgProtocol):
|
||||
"""
|
||||
assert self.running == False
|
||||
|
||||
self.env.zenith_cli(['pageserver', 'start'])
|
||||
start_args = ['pageserver', 'start']
|
||||
append_pageserver_param_overrides(start_args, self.remote_storage)
|
||||
|
||||
self.env.zenith_cli(start_args)
|
||||
self.running = True
|
||||
return self
|
||||
|
||||
@@ -729,6 +784,28 @@ class ZenithPageserver(PgProtocol):
|
||||
)
|
||||
|
||||
|
||||
def append_pageserver_param_overrides(params_to_update: List[str],
|
||||
pageserver_remote_storage: Optional[RemoteStorage]):
|
||||
if pageserver_remote_storage is not None:
|
||||
if isinstance(pageserver_remote_storage, LocalFsStorage):
|
||||
pageserver_storage_override = f"local_path='{pageserver_remote_storage.root}'"
|
||||
elif isinstance(pageserver_remote_storage, S3Storage):
|
||||
pageserver_storage_override = f"bucket_name='{pageserver_remote_storage.bucket}',\
|
||||
bucket_region='{pageserver_remote_storage.region}',access_key_id='{pageserver_remote_storage.access_key}',\
|
||||
secret_access_key='{pageserver_remote_storage.secret_key}'"
|
||||
|
||||
else:
|
||||
raise Exception(f'Unknown storage configuration {pageserver_remote_storage}')
|
||||
params_to_update.append(
|
||||
f'--pageserver-config-override=remote_storage={{{pageserver_storage_override}}}')
|
||||
|
||||
env_overrides = os.getenv('ZENITH_PAGESERVER_OVERRIDES')
|
||||
if env_overrides is not None:
|
||||
params_to_update += [
|
||||
f'--pageserver-config-override={o.strip()}' for o in env_overrides.split(';')
|
||||
]
|
||||
|
||||
|
||||
class PgBin:
|
||||
""" A helper class for executing postgres binaries """
|
||||
def __init__(self, log_dir: str):
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 14f9177a22...6309cf1b52
@@ -10,6 +10,7 @@ use std::fs::File;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::thread;
|
||||
use tracing::*;
|
||||
use walkeeper::timeline::{CreateControlFile, FileStorage};
|
||||
use zenith_utils::http::endpoint;
|
||||
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
|
||||
|
||||
@@ -86,8 +87,21 @@ fn main() -> Result<()> {
|
||||
.takes_value(false)
|
||||
.help("Do not wait for changes to be written safely to disk"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("dump-control-file")
|
||||
.long("dump-control-file")
|
||||
.takes_value(true)
|
||||
.help("Dump control file at path specifed by this argument and exit"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
if let Some(addr) = arg_matches.value_of("dump-control-file") {
|
||||
let state = FileStorage::load_control_file(Path::new(addr), CreateControlFile::False)?;
|
||||
let json = serde_json::to_string(&state)?;
|
||||
print!("{}", json);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut conf: SafeKeeperConf = Default::default();
|
||||
|
||||
if let Some(dir) = arg_matches.value_of("datadir") {
|
||||
|
||||
@@ -62,7 +62,7 @@ impl Default for SafeKeeperConf {
|
||||
daemonize: false,
|
||||
no_sync: false,
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
ttl: None,
|
||||
recall_period: defaults::DEFAULT_RECALL_PERIOD,
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ use zenith_utils::pq_proto::SystemId;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 2;
|
||||
pub const SK_FORMAT_VERSION: u32 = 3;
|
||||
const SK_PROTOCOL_VERSION: u32 = 1;
|
||||
const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
|
||||
@@ -133,9 +133,11 @@ pub struct ServerInfo {
|
||||
/// Postgres server version
|
||||
pub pg_version: u32,
|
||||
pub system_id: SystemId,
|
||||
#[serde(with = "hex")]
|
||||
pub tenant_id: ZTenantId,
|
||||
/// Zenith timelineid
|
||||
pub ztli: ZTimelineId,
|
||||
#[serde(with = "hex")]
|
||||
pub timeline_id: ZTimelineId,
|
||||
pub wal_seg_size: u32,
|
||||
}
|
||||
|
||||
@@ -149,6 +151,7 @@ pub struct SafeKeeperState {
|
||||
pub server: ServerInfo,
|
||||
/// Unique id of the last *elected* proposer we dealed with. Not needed
|
||||
/// for correctness, exists for monitoring purposes.
|
||||
#[serde(with = "hex")]
|
||||
pub proposer_uuid: PgUuid,
|
||||
/// part of WAL acknowledged by quorum and available locally
|
||||
pub commit_lsn: Lsn,
|
||||
@@ -171,7 +174,7 @@ impl SafeKeeperState {
|
||||
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
|
||||
system_id: 0, /* Postgres system identifier */
|
||||
tenant_id: ZTenantId::from([0u8; 16]),
|
||||
ztli: ZTimelineId::from([0u8; 16]),
|
||||
timeline_id: ZTimelineId::from([0u8; 16]),
|
||||
wal_seg_size: 0,
|
||||
},
|
||||
proposer_uuid: [0; 16],
|
||||
@@ -560,13 +563,13 @@ where
|
||||
// set basic info about server, if not yet
|
||||
self.s.server.system_id = msg.system_id;
|
||||
self.s.server.tenant_id = msg.tenant_id;
|
||||
self.s.server.ztli = msg.ztli;
|
||||
self.s.server.timeline_id = msg.ztli;
|
||||
self.s.server.wal_seg_size = msg.wal_seg_size;
|
||||
self.storage
|
||||
.persist(&self.s)
|
||||
.with_context(|| "failed to persist shared state")?;
|
||||
|
||||
self.metrics = SafeKeeperMetrics::new(self.s.server.ztli);
|
||||
self.metrics = SafeKeeperMetrics::new(self.s.server.timeline_id);
|
||||
|
||||
info!(
|
||||
"processed greeting from proposer {:?}, sending term {:?}",
|
||||
|
||||
@@ -79,7 +79,7 @@ struct ReplicationConnGuard {
|
||||
|
||||
impl Drop for ReplicationConnGuard {
|
||||
fn drop(&mut self) {
|
||||
self.timeline.update_replica_state(self.replica, None);
|
||||
self.timeline.remove_replica(self.replica);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,14 +120,12 @@ impl ReplicationConn {
|
||||
/// This is spawned into the background by `handle_start_replication`.
|
||||
fn background_thread(
|
||||
mut stream_in: ReadStream,
|
||||
timeline: Arc<Timeline>,
|
||||
replica_id: usize,
|
||||
replica_guard: Arc<ReplicationConnGuard>,
|
||||
) -> Result<()> {
|
||||
let replica_id = replica_guard.replica;
|
||||
let timeline = &replica_guard.timeline;
|
||||
|
||||
let mut state = ReplicaState::new();
|
||||
let _guard = ReplicationConnGuard {
|
||||
replica: replica_id,
|
||||
timeline: timeline.clone(),
|
||||
};
|
||||
// Wait for replica's feedback.
|
||||
while let Some(msg) = FeMessage::read(&mut stream_in)? {
|
||||
match &msg {
|
||||
@@ -140,7 +138,7 @@ impl ReplicationConn {
|
||||
// Note: deserializing is on m[1..] because we skip the tag byte.
|
||||
state.hs_feedback = HotStandbyFeedback::des(&m[1..])
|
||||
.context("failed to deserialize HotStandbyFeedback")?;
|
||||
timeline.update_replica_state(replica_id, Some(state));
|
||||
timeline.update_replica_state(replica_id, state);
|
||||
}
|
||||
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
|
||||
let reply = StandbyReply::des(&m[1..])
|
||||
@@ -148,7 +146,7 @@ impl ReplicationConn {
|
||||
state.last_received_lsn = reply.write_lsn;
|
||||
state.disk_consistent_lsn = reply.flush_lsn;
|
||||
state.remote_consistent_lsn = reply.apply_lsn;
|
||||
timeline.update_replica_state(replica_id, Some(state));
|
||||
timeline.update_replica_state(replica_id, state);
|
||||
}
|
||||
_ => warn!("unexpected message {:?}", msg),
|
||||
}
|
||||
@@ -207,16 +205,23 @@ impl ReplicationConn {
|
||||
// This replica_id is used below to check if it's time to stop replication.
|
||||
let replica_id = bg_timeline.add_replica(state);
|
||||
|
||||
// Use a guard object to remove our entry from the timeline, when the background
|
||||
// thread and us have both finished using it.
|
||||
let replica_guard = Arc::new(ReplicationConnGuard {
|
||||
replica: replica_id,
|
||||
timeline: bg_timeline,
|
||||
});
|
||||
let bg_replica_guard = Arc::clone(&replica_guard);
|
||||
|
||||
// TODO: here we got two threads, one for writing WAL and one for receiving
|
||||
// feedback. If one of them fails, we should shutdown the other one too.
|
||||
let _ = thread::Builder::new()
|
||||
.name("HotStandbyFeedback thread".into())
|
||||
.spawn(move || {
|
||||
if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) {
|
||||
if let Err(err) = Self::background_thread(bg_stream_in, bg_replica_guard) {
|
||||
error!("Replication background thread failed: {}", err);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
})?;
|
||||
|
||||
let mut wal_seg_size: usize;
|
||||
loop {
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::cmp::{max, min};
|
||||
use std::collections::HashMap;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Read, Seek, SeekFrom, Write};
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
@@ -121,7 +121,7 @@ impl SharedState {
|
||||
}
|
||||
|
||||
/// Assign new replica ID. We choose first empty cell in the replicas vector
|
||||
/// or extend the vector if there are not free items.
|
||||
/// or extend the vector if there are no free slots.
|
||||
pub fn add_replica(&mut self, state: ReplicaState) -> usize {
|
||||
if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) {
|
||||
self.replicas[pos] = Some(state);
|
||||
@@ -136,13 +136,14 @@ impl SharedState {
|
||||
/// If create=false and file doesn't exist, bails out.
|
||||
fn create_restore(
|
||||
conf: &SafeKeeperConf,
|
||||
timelineid: ZTimelineId,
|
||||
timeline_id: ZTimelineId,
|
||||
create: CreateControlFile,
|
||||
) -> Result<Self> {
|
||||
let (file_storage, state) = FileStorage::load_from_control_file(conf, timelineid, create)
|
||||
let state = FileStorage::load_control_file_conf(conf, timeline_id, create)
|
||||
.with_context(|| "failed to load from control file")?;
|
||||
let file_storage = FileStorage::new(timeline_id, conf);
|
||||
let flush_lsn = if state.server.wal_seg_size != 0 {
|
||||
let wal_dir = conf.timeline_dir(&timelineid);
|
||||
let wal_dir = conf.timeline_dir(&timeline_id);
|
||||
find_end_of_wal(
|
||||
&wal_dir,
|
||||
state.server.wal_seg_size as usize,
|
||||
@@ -297,9 +298,15 @@ impl Timeline {
|
||||
shared_state.add_replica(state)
|
||||
}
|
||||
|
||||
pub fn update_replica_state(&self, id: usize, state: Option<ReplicaState>) {
|
||||
pub fn update_replica_state(&self, id: usize, state: ReplicaState) {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.replicas[id] = state;
|
||||
shared_state.replicas[id] = Some(state);
|
||||
}
|
||||
|
||||
pub fn remove_replica(&self, id: usize) {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
assert!(shared_state.replicas[id].is_some());
|
||||
shared_state.replicas[id] = None;
|
||||
}
|
||||
|
||||
pub fn get_end_of_wal(&self) -> Lsn {
|
||||
@@ -381,7 +388,7 @@ impl GlobalTimelines {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FileStorage {
|
||||
pub struct FileStorage {
|
||||
// save timeline dir to avoid reconstructing it every time
|
||||
timeline_dir: PathBuf,
|
||||
conf: SafeKeeperConf,
|
||||
@@ -389,6 +396,17 @@ struct FileStorage {
|
||||
}
|
||||
|
||||
impl FileStorage {
|
||||
fn new(timeline_id: ZTimelineId, conf: &SafeKeeperConf) -> FileStorage {
|
||||
let timeline_dir = conf.timeline_dir(&timeline_id);
|
||||
let timelineid_str = format!("{}", timeline_id);
|
||||
FileStorage {
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
|
||||
.with_label_values(&[&timelineid_str]),
|
||||
}
|
||||
}
|
||||
|
||||
// Check the magic/version in the on-disk data and deserialize it, if possible.
|
||||
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
|
||||
// Read the version independent part
|
||||
@@ -409,20 +427,24 @@ impl FileStorage {
|
||||
upgrade_control_file(buf, version)
|
||||
}
|
||||
|
||||
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
|
||||
/// If create=false and file doesn't exist, bails out.
|
||||
fn load_from_control_file(
|
||||
fn load_control_file_conf(
|
||||
conf: &SafeKeeperConf,
|
||||
timelineid: ZTimelineId,
|
||||
timeline_id: ZTimelineId,
|
||||
create: CreateControlFile,
|
||||
) -> Result<(FileStorage, SafeKeeperState)> {
|
||||
let timeline_dir = conf.timeline_dir(&timelineid);
|
||||
|
||||
let control_file_path = timeline_dir.join(CONTROL_FILE_NAME);
|
||||
) -> Result<SafeKeeperState> {
|
||||
let path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME);
|
||||
Self::load_control_file(path, create)
|
||||
}
|
||||
|
||||
/// Read in the control file.
|
||||
/// If create=false and file doesn't exist, bails out.
|
||||
pub fn load_control_file<P: AsRef<Path>>(
|
||||
control_file_path: P,
|
||||
create: CreateControlFile,
|
||||
) -> Result<SafeKeeperState> {
|
||||
info!(
|
||||
"loading control file {}, create={:?}",
|
||||
control_file_path.display(),
|
||||
control_file_path.as_ref().display(),
|
||||
create,
|
||||
);
|
||||
|
||||
@@ -434,7 +456,7 @@ impl FileStorage {
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to open control file at {}",
|
||||
control_file_path.display(),
|
||||
control_file_path.as_ref().display(),
|
||||
)
|
||||
})?;
|
||||
|
||||
@@ -465,21 +487,15 @@ impl FileStorage {
|
||||
);
|
||||
|
||||
FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context(
|
||||
|| format!("while reading control file {}", control_file_path.display(),),
|
||||
|| {
|
||||
format!(
|
||||
"while reading control file {}",
|
||||
control_file_path.as_ref().display(),
|
||||
)
|
||||
},
|
||||
)?
|
||||
};
|
||||
|
||||
let timelineid_str = format!("{}", timelineid);
|
||||
|
||||
Ok((
|
||||
FileStorage {
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
|
||||
.with_label_values(&[&timelineid_str]),
|
||||
},
|
||||
state,
|
||||
))
|
||||
Ok(state)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -549,7 +565,7 @@ impl Storage for FileStorage {
|
||||
let mut start_pos = startpos;
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
let wal_seg_size = server.wal_seg_size as usize;
|
||||
let ztli = server.ztli;
|
||||
let ztli = server.timeline_id;
|
||||
|
||||
/* Extract WAL location for this block */
|
||||
let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
|
||||
@@ -637,7 +653,7 @@ impl Storage for FileStorage {
|
||||
let partial;
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
let wal_seg_size = server.wal_seg_size as usize;
|
||||
let ztli = server.ztli;
|
||||
let ztli = server.timeline_id;
|
||||
|
||||
/* Extract WAL location for this block */
|
||||
let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
|
||||
@@ -737,7 +753,10 @@ mod test {
|
||||
) -> Result<(FileStorage, SafeKeeperState)> {
|
||||
fs::create_dir_all(&conf.timeline_dir(&timeline_id))
|
||||
.expect("failed to create timeline dir");
|
||||
FileStorage::load_from_control_file(conf, timeline_id, create)
|
||||
Ok((
|
||||
FileStorage::new(timeline_id, conf),
|
||||
FileStorage::load_control_file_conf(conf, timeline_id, create)?,
|
||||
))
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -5,7 +5,12 @@ use crate::safekeeper::{
|
||||
use anyhow::{bail, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::*;
|
||||
use zenith_utils::{bin_ser::LeSer, lsn::Lsn};
|
||||
use zenith_utils::{
|
||||
bin_ser::LeSer,
|
||||
lsn::Lsn,
|
||||
pq_proto::SystemId,
|
||||
zid::{ZTenantId, ZTimelineId},
|
||||
};
|
||||
|
||||
/// Persistent consensus state of the acceptor.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -35,6 +40,36 @@ struct SafeKeeperStateV1 {
|
||||
wal_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct ServerInfoV2 {
|
||||
/// Postgres server version
|
||||
pub pg_version: u32,
|
||||
pub system_id: SystemId,
|
||||
pub tenant_id: ZTenantId,
|
||||
/// Zenith timelineid
|
||||
pub ztli: ZTimelineId,
|
||||
pub wal_seg_size: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SafeKeeperStateV2 {
|
||||
/// persistent acceptor state
|
||||
pub acceptor_state: AcceptorState,
|
||||
/// information about server
|
||||
pub server: ServerInfoV2,
|
||||
/// Unique id of the last *elected* proposer we dealed with. Not needed
|
||||
/// for correctness, exists for monitoring purposes.
|
||||
pub proposer_uuid: PgUuid,
|
||||
/// part of WAL acknowledged by quorum and available locally
|
||||
pub commit_lsn: Lsn,
|
||||
/// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
|
||||
/// of last record streamed to everyone)
|
||||
pub truncate_lsn: Lsn,
|
||||
// Safekeeper starts receiving WAL from this LSN, zeros before it ought to
|
||||
// be skipped during decoding.
|
||||
pub wal_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
|
||||
// migrate to storing full term history
|
||||
if version == 1 {
|
||||
@@ -55,6 +90,25 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
|
||||
truncate_lsn: oldstate.truncate_lsn,
|
||||
wal_start_lsn: oldstate.wal_start_lsn,
|
||||
});
|
||||
// migrate to hexing some zids
|
||||
} else if version == 2 {
|
||||
info!("reading safekeeper control file version {}", version);
|
||||
let oldstate = SafeKeeperStateV2::des(&buf[..buf.len()])?;
|
||||
let server = ServerInfo {
|
||||
pg_version: oldstate.server.pg_version,
|
||||
system_id: oldstate.server.system_id,
|
||||
tenant_id: oldstate.server.tenant_id,
|
||||
timeline_id: oldstate.server.ztli,
|
||||
wal_seg_size: oldstate.server.wal_seg_size,
|
||||
};
|
||||
return Ok(SafeKeeperState {
|
||||
acceptor_state: oldstate.acceptor_state,
|
||||
server,
|
||||
proposer_uuid: oldstate.proposer_uuid,
|
||||
commit_lsn: oldstate.commit_lsn,
|
||||
truncate_lsn: oldstate.truncate_lsn,
|
||||
wal_start_lsn: oldstate.wal_start_lsn,
|
||||
});
|
||||
}
|
||||
bail!("unsupported safekeeper control file version {}", version)
|
||||
}
|
||||
|
||||
@@ -102,12 +102,21 @@ fn main() -> Result<()> {
|
||||
.required(false)
|
||||
.value_name("stop-mode");
|
||||
|
||||
let pageserver_config_args = Arg::with_name("pageserver-config-override")
|
||||
.long("pageserver-config-override")
|
||||
.takes_value(true)
|
||||
.number_of_values(1)
|
||||
.multiple(true)
|
||||
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
|
||||
.required(false);
|
||||
|
||||
let matches = App::new("Zenith CLI")
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
.version(GIT_VERSION)
|
||||
.subcommand(
|
||||
SubCommand::with_name("init")
|
||||
.about("Initialize a new Zenith repository")
|
||||
.arg(pageserver_config_args.clone())
|
||||
.arg(
|
||||
Arg::with_name("config")
|
||||
.long("config")
|
||||
@@ -133,10 +142,10 @@ fn main() -> Result<()> {
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
.about("Manage pageserver")
|
||||
.subcommand(SubCommand::with_name("status"))
|
||||
.subcommand(SubCommand::with_name("start").about("Start local pageserver"))
|
||||
.subcommand(SubCommand::with_name("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
|
||||
.subcommand(SubCommand::with_name("stop").about("Stop local pageserver")
|
||||
.arg(stop_mode_arg.clone()))
|
||||
.subcommand(SubCommand::with_name("restart").about("Restart local pageserver"))
|
||||
.subcommand(SubCommand::with_name("restart").about("Restart local pageserver").arg(pageserver_config_args))
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("safekeeper")
|
||||
@@ -403,6 +412,7 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> {
|
||||
if let Err(e) = pageserver.init(
|
||||
// default_tenantid was generated by the `env.init()` call above
|
||||
Some(&env.default_tenantid.unwrap().to_string()),
|
||||
&pageserver_config_overrides(init_match),
|
||||
) {
|
||||
eprintln!("pageserver init failed: {}", e);
|
||||
exit(1);
|
||||
@@ -411,6 +421,14 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pageserver_config_overrides<'a>(init_match: &'a ArgMatches) -> Vec<&'a str> {
|
||||
init_match
|
||||
.values_of("pageserver-config-override")
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let pageserver = PageServerNode::from_env(env);
|
||||
match tenant_match.subcommand() {
|
||||
@@ -572,8 +590,8 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
let pageserver = PageServerNode::from_env(env);
|
||||
|
||||
match sub_match.subcommand() {
|
||||
("start", Some(_sub_m)) => {
|
||||
if let Err(e) = pageserver.start() {
|
||||
("start", Some(start_match)) => {
|
||||
if let Err(e) = pageserver.start(&pageserver_config_overrides(start_match)) {
|
||||
eprintln!("pageserver start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
@@ -588,22 +606,20 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
}
|
||||
}
|
||||
|
||||
("restart", Some(_sub_m)) => {
|
||||
("restart", Some(restart_match)) => {
|
||||
//TODO what shutdown strategy should we use here?
|
||||
if let Err(e) = pageserver.stop(false) {
|
||||
eprintln!("pageserver stop failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if let Err(e) = pageserver.start() {
|
||||
if let Err(e) = pageserver.start(&pageserver_config_overrides(restart_match)) {
|
||||
eprintln!("pageserver start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
(sub_name, _) => {
|
||||
bail!("Unexpected pageserver subcommand '{}'", sub_name)
|
||||
}
|
||||
(sub_name, _) => bail!("Unexpected pageserver subcommand '{}'", sub_name),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -662,12 +678,12 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_start_all(_sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let pageserver = PageServerNode::from_env(env);
|
||||
|
||||
// Postgres nodes are not started automatically
|
||||
|
||||
if let Err(e) = pageserver.start() {
|
||||
if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) {
|
||||
eprintln!("pageserver start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user