Compare commits

...

14 Commits

Author SHA1 Message Date
Arseny Sher
8814c13b8b Prevent commit_lsn <= flush_lsn violation after a42eba3cd7.
Nothing complained about that yet, but we definitely don't hold at least one
assert, so let's keep it this way until better version.
2022-05-27 14:11:56 +04:00
Arseny Sher
a42eba3cd7 s3 WAL offloading staging review.
- Uncomment accidently `self.keep_alive.abort()` commented line, due to this
  task never finished, which blocked launcher.
- Mess up with initialization one more time, to fix offloader trying to back up
  segment 0. Now we initialize all required LSNs in handle_elected,
  where we learn start LSN for the first time.
- Fix blind attempt to provide safekeeper service file with remote storage
  params.
2022-05-27 13:17:10 +04:00
Arseny Sher
0e1bd57c53 Add WAL offloading to s3 on safekeepers.
Separate task is launched for each timeline and stopped when timeline doesn't
need offloading. Decision who offloads is done through etcd leader election;
currently there is no pre condition for participating, that's a TODO.

neon_local and tests infrastructure for remote storage in safekeepers added,
along with the test itself.

ref #1009

Co-authored-by: Anton Shyrabokau <ahtoxa@Antons-MacBook-Pro.local>
2022-05-27 06:19:23 +04:00
bojanserafimov
1d71949c51 Change proxy welcome message (#1808)
Remove zenith sun and outdated instructions around .pgpass
2022-05-26 14:59:03 -04:00
Thang Pham
7d565aa4b9 Reduce the logging level when PG client disconnected to INFO (#1713)
Fixes #1683.
2022-05-26 12:21:15 -04:00
Dmitry Rodionov
72a7220dc8 Tidy up some log messages
* turn println into an info with proper message
* rename new_local_timeline to load_local_timeline because it does not
  create new timeline, it registers timeline that exists on disk in
  pageserver in-memory structures
2022-05-26 18:37:40 +03:00
Konstantin Knizhnik
b0d114ee3f Initialize last_freeze_at with disk consistent LSN to avoid creation of small L0 delta layer on startup
refer #1736
2022-05-26 15:42:18 +03:00
Dmitry Rodionov
38f2d165b7 allow TLS 1.2 in proxy to be compatible with older client libraries 2022-05-26 13:21:29 +03:00
Dmitry Rodionov
5a5737278e add simple metrics for remote storage operations
track number of operations and number of their failures
2022-05-26 01:24:52 +03:00
Kirill Bulatov
06f5e017a1 Move rustfmt check to GH Action 2022-05-26 01:03:48 +03:00
Kirill Bulatov
887b0e14d9 Run basic checks on PRs and pushes to main only 2022-05-26 01:03:48 +03:00
chaitanya sharma
c584d90bb9 initial commit, renamed znodeid to nodeid. 2022-05-25 20:11:26 +03:00
Heikki Linnakangas
7997fc2932 Fix error handling with 'basebackup' command.
If the 'basebackup' command failed in the middle of building the tar
archive, the client would not report the error, but would attempt to
to start up postgres with the partial contents of the data directory.
That fails because the control file is missing (it's added to the
archive last, precisly to make sure that you cannot start postgres
from a partial archive). But the client doesn't see the proper error
message that caused the basebackup to fail in the server, which is
confusing.

Two issues conspired to cause that:

1. The tar::Builder object that we use in the pageserver to construct
the tar stream has a Drop handler that automatically writes a valid
end-of-archive marker on drop. Because of that, the resulting tarball
looks complete, even if an error happens while we're building it. The
pageserver does send an ErrorResponse after the seemingly-valid
tarball, but:

2. The client stops reading the Copy stream, as soon as it sees the
tar end-of-archive marker. Therefore, it doesn't read the
ErrorResponse that comes after it.

We have two clients that call 'basebackup', one in `control_plane`
used by the `neon_local` binary, and another one in
`compute_tools`. Both had the same issue.

This PR fixes both issues, even though fixing either one would be
enough to fix the problem at hand. The pageserver now doesn't send the
end-of-archive marker on error, and the client now reads the copy
stream to the end, even if it sees an end-of-archive marker.

Fixes github issue #1715

In the passing, change Basebackup to use generic Write rather than
'dyn'.
2022-05-25 18:14:44 +03:00
Heikki Linnakangas
24d2313d0b Set --quota-backend-bytes when launching etcd in tests.
By default, etcd makes a huge 10 GB mmap() allocation when it starts up.
It doesn't actually use that much memory, it's just address space, but
it caused me grief when I tried to use 'rr' to debug a python test run.
Apparently, when you replay the 'rr' trace, it does allocate memory for
all that address space.

The size of the initial mmap depends on the --quota-backend-bytes setting.
Our etcd clusters are very small, so let's set --quota-backend-bytes to
keep the virtual memory size small, to make debugging with 'rr' easier.

See https://github.com/etcd-io/etcd/issues/7910 and
5e4b008106
2022-05-25 16:57:45 +03:00
47 changed files with 1516 additions and 567 deletions

View File

@@ -16,4 +16,3 @@ console_mgmt_base_url = http://console-release.local
bucket_name = zenith-storage-oregon
bucket_region = us-west-2
etcd_endpoints = etcd-release.local:2379
safekeeper_enable_s3_offload = false

View File

@@ -17,4 +17,3 @@ console_mgmt_base_url = http://console-staging.local
bucket_name = zenith-staging-storage-us-east-1
bucket_region = us-east-1
etcd_endpoints = etcd-staging.local:2379
safekeeper_enable_s3_offload = false

View File

@@ -6,7 +6,7 @@ After=network.target auditd.service
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --enable-s3-offload={{ safekeeper_enable_s3_offload }}
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="wal"}'
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT

View File

@@ -11,15 +11,6 @@ executors:
- image: zimg/rust:1.58
jobs:
check-codestyle-rust:
executor: neon-xlarge-executor
steps:
- checkout
- run:
name: rustfmt
when: always
command: cargo fmt --all -- --check
# A job to build postgres
build-postgres:
executor: neon-xlarge-executor
@@ -740,7 +731,6 @@ jobs:
workflows:
build_and_test:
jobs:
- check-codestyle-rust
- check-codestyle-python
- build-postgres:
name: build-postgres-<< matrix.build_type >>

View File

@@ -1,8 +1,10 @@
name: Build and Test
on:
pull_request:
push:
branches:
- main
pull_request:
jobs:
regression-check:
@@ -23,13 +25,17 @@ jobs:
submodules: true
fetch-depth: 2
- name: install rust toolchain ${{ matrix.rust_toolchain }}
- name: Install rust toolchain ${{ matrix.rust_toolchain }}
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: ${{ matrix.rust_toolchain }}
components: rustfmt, clippy
override: true
- name: Check formatting
run: cargo fmt --all -- --check
- name: Install Ubuntu postgres dependencies
if: matrix.os == 'ubuntu-latest'
run: |

11
Cargo.lock generated
View File

@@ -1722,9 +1722,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5"
checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]]
name = "oorandom"
@@ -2394,6 +2394,8 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"metrics",
"once_cell",
"rusoto_core",
"rusoto_s3",
"serde",
@@ -2401,6 +2403,7 @@ dependencies = [
"tempfile",
"tokio",
"tokio-util 0.7.0",
"toml_edit",
"tracing",
"workspace_hack",
]
@@ -2652,6 +2655,7 @@ name = "safekeeper"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"byteorder",
"bytes",
"clap 3.0.14",
@@ -2660,12 +2664,14 @@ dependencies = [
"daemonize",
"etcd_broker",
"fs2",
"futures",
"git-version",
"hex",
"humantime",
"hyper",
"lazy_static",
"metrics",
"once_cell",
"postgres",
"postgres-protocol",
"postgres_ffi",
@@ -2679,6 +2685,7 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-util 0.7.0",
"toml_edit",
"tracing",
"url",
"utils",

View File

@@ -146,8 +146,14 @@ impl ComputeNode {
_ => format!("basebackup {} {} {}", &self.tenant, &self.timeline, lsn),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
let mut ar = tar::Archive::new(copyreader);
// Read the archive directly from the `CopyOutReader`
//
// Set `ignore_zeros` so that unpack() reads all the Copy data and
// doesn't stop at the end-of-archive marker. Otherwise, if the server
// sends an Error after finishing the tarball, we will not notice it.
let mut ar = tar::Archive::new(copyreader);
ar.set_ignore_zeros(true);
ar.unpack(&self.pgdata)?;
self.metrics.basebackup_ms.store(

View File

@@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
tar = "0.4.33"
tar = "0.4.38"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"

View File

@@ -231,8 +231,13 @@ impl PostgresNode {
.context("page server 'basebackup' command failed")?;
// Read the archive directly from the `CopyOutReader`
tar::Archive::new(copyreader)
.unpack(&self.pgdata())
//
// Set `ignore_zeros` so that unpack() reads all the Copy data and
// doesn't stop at the end-of-archive marker. Otherwise, if the server
// sends an Error after finishing the tarball, we will not notice it.
let mut ar = tar::Archive::new(copyreader);
ar.set_ignore_zeros(true);
ar.unpack(&self.pgdata())
.context("extracting base backup failed")?;
Ok(())

View File

@@ -48,6 +48,10 @@ pub fn start_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
format!("--data-dir={}", etcd_data_dir.display()),
format!("--listen-client-urls={client_urls}"),
format!("--advertise-client-urls={client_urls}"),
// Set --quota-backend-bytes to keep the etcd virtual memory
// size smaller. Our test etcd clusters are very small.
// See https://github.com/etcd-io/etcd/issues/7910
"--quota-backend-bytes=100000000".to_string(),
])
.stdout(Stdio::from(etcd_stdout_file))
.stderr(Stdio::from(etcd_stderr_file))

View File

@@ -49,3 +49,12 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
cmd
}
}
fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] {
if let Ok(value) = std::env::var(env_key) {
cmd = cmd.env(env_key, value);
}
}
cmd
}

View File

@@ -15,7 +15,7 @@ use std::process::{Command, Stdio};
use utils::{
auth::{encode_from_key_file, Claims, Scope},
postgres_backend::AuthType,
zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
};
use crate::safekeeper::SafekeeperNode;
@@ -136,7 +136,7 @@ impl EtcdBroker {
#[serde(default)]
pub struct PageServerConf {
// node id
pub id: ZNodeId,
pub id: NodeId,
// Pageserver connection settings
pub listen_pg_addr: String,
pub listen_http_addr: String,
@@ -151,7 +151,7 @@ pub struct PageServerConf {
impl Default for PageServerConf {
fn default() -> Self {
Self {
id: ZNodeId(0),
id: NodeId(0),
listen_pg_addr: String::new(),
listen_http_addr: String::new(),
auth_type: AuthType::Trust,
@@ -163,19 +163,23 @@ impl Default for PageServerConf {
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct SafekeeperConf {
pub id: ZNodeId,
pub id: NodeId,
pub pg_port: u16,
pub http_port: u16,
pub sync: bool,
pub remote_storage: Option<String>,
pub backup_threads: Option<u32>,
}
impl Default for SafekeeperConf {
fn default() -> Self {
Self {
id: ZNodeId(0),
id: NodeId(0),
pg_port: 0,
http_port: 0,
sync: true,
remote_storage: None,
backup_threads: None,
}
}
}
@@ -377,6 +381,7 @@ impl LocalEnv {
base_path != Path::new(""),
"repository base path is missing"
);
ensure!(
!base_path.exists(),
"directory '{}' already exists. Perhaps already initialized?",

View File

@@ -18,12 +18,12 @@ use thiserror::Error;
use utils::{
connstring::connection_address,
http::error::HttpErrorBody,
zid::{ZNodeId, ZTenantId, ZTimelineId},
zid::{NodeId, ZTenantId, ZTimelineId},
};
use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::storage::PageServerNode;
use crate::{fill_rust_env_vars, read_pidfile};
use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile};
#[derive(Error, Debug)]
pub enum SafekeeperHttpError {
@@ -65,7 +65,7 @@ impl ResponseErrorMessageExt for Response {
//
#[derive(Debug)]
pub struct SafekeeperNode {
pub id: ZNodeId,
pub id: NodeId,
pub conf: SafekeeperConf,
@@ -100,7 +100,7 @@ impl SafekeeperNode {
.unwrap()
}
pub fn datadir_path_by_id(env: &LocalEnv, sk_id: ZNodeId) -> PathBuf {
pub fn datadir_path_by_id(env: &LocalEnv, sk_id: NodeId) -> PathBuf {
env.safekeeper_data_dir(format!("sk{}", sk_id).as_ref())
}
@@ -143,6 +143,14 @@ impl SafekeeperNode {
if let Some(prefix) = self.env.etcd_broker.broker_etcd_prefix.as_deref() {
cmd.args(&["--broker-etcd-prefix", prefix]);
}
if let Some(threads) = self.conf.backup_threads {
cmd.args(&["--backup-threads", threads.to_string().as_ref()]);
}
if let Some(ref remote_storage) = self.conf.remote_storage {
cmd.args(&["--remote-storage", remote_storage]);
}
fill_aws_secrets_vars(&mut cmd);
if !cmd.status()?.success() {
bail!(
@@ -286,7 +294,7 @@ impl SafekeeperNode {
&self,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
peer_ids: Vec<ZNodeId>,
peer_ids: Vec<NodeId>,
) -> Result<()> {
Ok(self
.http_request(

View File

@@ -25,7 +25,7 @@ use utils::{
};
use crate::local_env::LocalEnv;
use crate::{fill_rust_env_vars, read_pidfile};
use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile};
use pageserver::tenant_mgr::TenantInfo;
#[derive(Error, Debug)]
@@ -493,12 +493,3 @@ impl PageServerNode {
Ok(timeline_info_response)
}
}
fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] {
if let Ok(value) = std::env::var(env_key) {
cmd = cmd.env(env_key, value);
}
}
cmd
}

View File

@@ -16,7 +16,7 @@ use tokio::{sync::mpsc, task::JoinHandle};
use tracing::*;
use utils::{
lsn::Lsn,
zid::{ZNodeId, ZTenantId, ZTenantTimelineId},
zid::{NodeId, ZTenantId, ZTenantTimelineId},
};
/// Default value to use for prefixing to all etcd keys with.
@@ -25,7 +25,7 @@ pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon";
#[derive(Debug, Deserialize, Serialize)]
struct SafekeeperTimeline {
safekeeper_id: ZNodeId,
safekeeper_id: NodeId,
info: SkTimelineInfo,
}
@@ -43,10 +43,10 @@ pub struct SkTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub commit_lsn: Option<Lsn>,
/// LSN up to which safekeeper offloaded WAL to s3.
/// LSN up to which safekeeper has backed WAL.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub s3_wal_lsn: Option<Lsn>,
pub backup_lsn: Option<Lsn>,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
@@ -71,7 +71,7 @@ pub enum BrokerError {
/// A way to control the data retrieval from a certain subscription.
pub struct SkTimelineSubscription {
safekeeper_timeline_updates:
mpsc::UnboundedReceiver<HashMap<ZTenantTimelineId, HashMap<ZNodeId, SkTimelineInfo>>>,
mpsc::UnboundedReceiver<HashMap<ZTenantTimelineId, HashMap<NodeId, SkTimelineInfo>>>,
kind: SkTimelineSubscriptionKind,
watcher_handle: JoinHandle<Result<(), BrokerError>>,
watcher: Watcher,
@@ -81,7 +81,7 @@ impl SkTimelineSubscription {
/// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet.
pub async fn fetch_data(
&mut self,
) -> Option<HashMap<ZTenantTimelineId, HashMap<ZNodeId, SkTimelineInfo>>> {
) -> Option<HashMap<ZTenantTimelineId, HashMap<NodeId, SkTimelineInfo>>> {
self.safekeeper_timeline_updates.recv().await
}
@@ -221,7 +221,7 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
break;
}
let mut timeline_updates: HashMap<ZTenantTimelineId, HashMap<ZNodeId, SkTimelineInfo>> = HashMap::new();
let mut timeline_updates: HashMap<ZTenantTimelineId, HashMap<NodeId, SkTimelineInfo>> = HashMap::new();
// Keep track that the timeline data updates from etcd arrive in the right order.
// https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas
// > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering.
@@ -299,18 +299,18 @@ fn parse_etcd_key_value(
parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?,
parse_capture(&caps, 2).map_err(BrokerError::ParsingError)?,
),
ZNodeId(parse_capture(&caps, 3).map_err(BrokerError::ParsingError)?),
NodeId(parse_capture(&caps, 3).map_err(BrokerError::ParsingError)?),
),
SubscriptionKind::Tenant(tenant_id) => (
ZTenantTimelineId::new(
tenant_id,
parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?,
),
ZNodeId(parse_capture(&caps, 2).map_err(BrokerError::ParsingError)?),
NodeId(parse_capture(&caps, 2).map_err(BrokerError::ParsingError)?),
),
SubscriptionKind::Timeline(zttid) => (
zttid,
ZNodeId(parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?),
NodeId(parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?),
),
};

View File

@@ -5,14 +5,17 @@ edition = "2021"
[dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
tokio = { version = "1.17", features = ["sync", "macros", "fs", "io-util"] }
tokio-util = { version = "0.7", features = ["io"] }
tracing = "0.1.27"
async-trait = "0.1"
metrics = { version = "0.1", path = "../metrics" }
once_cell = "1.8.0"
rusoto_core = "0.48"
rusoto_s3 = "0.48"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
async-trait = "0.1"
tokio = { version = "1.17", features = ["sync", "macros", "fs", "io-util"] }
tokio-util = { version = "0.7", features = ["io"] }
toml_edit = { version = "0.13", features = ["easy"] }
tracing = "0.1.27"
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -16,8 +16,10 @@ use std::{
path::{Path, PathBuf},
};
use anyhow::Context;
use anyhow::{bail, Context};
use tokio::io;
use toml_edit::Item;
use tracing::info;
pub use self::{
@@ -203,6 +205,90 @@ pub fn path_with_suffix_extension(original_path: impl AsRef<Path>, suffix: &str)
.with_extension(new_extension.as_ref())
}
impl RemoteStorageConfig {
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
let local_path = toml.get("local_path");
let bucket_name = toml.get("bucket_name");
let bucket_region = toml.get("bucket_region");
let max_concurrent_syncs = NonZeroUsize::new(
parse_optional_integer("max_concurrent_syncs", toml)?
.unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS),
)
.context("Failed to parse 'max_concurrent_syncs' as a positive integer")?;
let max_sync_errors = NonZeroU32::new(
parse_optional_integer("max_sync_errors", toml)?
.unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS),
)
.context("Failed to parse 'max_sync_errors' as a positive integer")?;
let concurrency_limit = NonZeroUsize::new(
parse_optional_integer("concurrency_limit", toml)?
.unwrap_or(DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT),
)
.context("Failed to parse 'concurrency_limit' as a positive integer")?;
let storage = match (local_path, bucket_name, bucket_region) {
(None, None, None) => bail!("no 'local_path' nor 'bucket_name' option"),
(_, Some(_), None) => {
bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
}
(_, None, Some(_)) => {
bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
}
(None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config {
bucket_name: parse_toml_string("bucket_name", bucket_name)?,
bucket_region: parse_toml_string("bucket_region", bucket_region)?,
prefix_in_bucket: toml
.get("prefix_in_bucket")
.map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket))
.transpose()?,
endpoint: toml
.get("endpoint")
.map(|endpoint| parse_toml_string("endpoint", endpoint))
.transpose()?,
concurrency_limit,
}),
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
parse_toml_string("local_path", local_path)?,
)),
(Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"),
};
Ok(RemoteStorageConfig {
max_concurrent_syncs,
max_sync_errors,
storage,
})
}
}
// Helper functions to parse a toml Item
fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
where
I: TryFrom<i64, Error = E>,
E: std::error::Error + Send + Sync + 'static,
{
let toml_integer = match item.get(name) {
Some(item) => item
.as_integer()
.with_context(|| format!("configure option {name} is not an integer"))?,
None => return Ok(None),
};
I::try_from(toml_integer)
.map(Some)
.with_context(|| format!("configure option {name} is too large"))
}
fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result<String> {
let s = item
.as_str()
.with_context(|| format!("configure option {name} is not a string"))?;
Ok(s.to_string())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -23,6 +23,71 @@ use crate::{strip_path_prefix, RemoteStorage, S3Config};
use super::StorageMetadata;
pub(super) mod metrics {
use metrics::{register_int_counter_vec, IntCounterVec};
use once_cell::sync::Lazy;
static S3_REQUESTS_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"remote_storage_s3_requests_count",
"Number of s3 requests of particular type",
&["request_type"],
)
.expect("failed to define a metric")
});
static S3_REQUESTS_FAIL_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"remote_storage_s3_failures_count",
"Number of failed s3 requests of particular type",
&["request_type"],
)
.expect("failed to define a metric")
});
pub fn inc_get_object() {
S3_REQUESTS_COUNT.with_label_values(&["get_object"]).inc();
}
pub fn inc_get_object_fail() {
S3_REQUESTS_FAIL_COUNT
.with_label_values(&["get_object"])
.inc();
}
pub fn inc_put_object() {
S3_REQUESTS_COUNT.with_label_values(&["put_object"]).inc();
}
pub fn inc_put_object_fail() {
S3_REQUESTS_FAIL_COUNT
.with_label_values(&["put_object"])
.inc();
}
pub fn inc_delete_object() {
S3_REQUESTS_COUNT
.with_label_values(&["delete_object"])
.inc();
}
pub fn inc_delete_object_fail() {
S3_REQUESTS_FAIL_COUNT
.with_label_values(&["delete_object"])
.inc();
}
pub fn inc_list_objects() {
S3_REQUESTS_COUNT.with_label_values(&["list_objects"]).inc();
}
pub fn inc_list_objects_fail() {
S3_REQUESTS_FAIL_COUNT
.with_label_values(&["list_objects"])
.inc();
}
}
const S3_PREFIX_SEPARATOR: char = '/';
#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
@@ -152,6 +217,9 @@ impl RemoteStorage for S3Bucket {
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 list")?;
metrics::inc_list_objects();
let fetch_response = self
.client
.list_objects_v2(ListObjectsV2Request {
@@ -160,7 +228,11 @@ impl RemoteStorage for S3Bucket {
continuation_token,
..ListObjectsV2Request::default()
})
.await?;
.await
.map_err(|e| {
metrics::inc_list_objects_fail();
e
})?;
document_keys.extend(
fetch_response
.contents
@@ -190,6 +262,8 @@ impl RemoteStorage for S3Bucket {
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 upload")?;
metrics::inc_put_object();
self.client
.put_object(PutObjectRequest {
body: Some(StreamingBody::new_with_size(
@@ -201,7 +275,11 @@ impl RemoteStorage for S3Bucket {
metadata: metadata.map(|m| m.0),
..PutObjectRequest::default()
})
.await?;
.await
.map_err(|e| {
metrics::inc_put_object_fail();
e
})?;
Ok(())
}
@@ -215,6 +293,9 @@ impl RemoteStorage for S3Bucket {
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 download")?;
metrics::inc_get_object();
let object_output = self
.client
.get_object(GetObjectRequest {
@@ -222,7 +303,11 @@ impl RemoteStorage for S3Bucket {
key: from.key().to_owned(),
..GetObjectRequest::default()
})
.await?;
.await
.map_err(|e| {
metrics::inc_get_object_fail();
e
})?;
if let Some(body) = object_output.body {
let mut from = io::BufReader::new(body.into_async_read());
@@ -251,6 +336,9 @@ impl RemoteStorage for S3Bucket {
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 range download")?;
metrics::inc_get_object();
let object_output = self
.client
.get_object(GetObjectRequest {
@@ -259,7 +347,11 @@ impl RemoteStorage for S3Bucket {
range,
..GetObjectRequest::default()
})
.await?;
.await
.map_err(|e| {
metrics::inc_get_object_fail();
e
})?;
if let Some(body) = object_output.body {
let mut from = io::BufReader::new(body.into_async_read());
@@ -275,13 +367,20 @@ impl RemoteStorage for S3Bucket {
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 delete")?;
metrics::inc_delete_object();
self.client
.delete_object(DeleteObjectRequest {
bucket: self.bucket_name.clone(),
key: path.key().to_owned(),
..DeleteObjectRequest::default()
})
.await?;
.await
.map_err(|e| {
metrics::inc_delete_object_fail();
e
})?;
Ok(())
}
}

View File

@@ -26,6 +26,9 @@ impl Lsn {
/// Maximum possible value for an LSN
pub const MAX: Lsn = Lsn(u64::MAX);
/// Invalid value for InvalidXLogRecPtr, as defined in xlogdefs.h
pub const INVALID: Lsn = Lsn(0);
/// Subtract a number, returning None on overflow.
pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
let other: u64 = other.into();
@@ -103,6 +106,12 @@ impl Lsn {
pub fn is_aligned(&self) -> bool {
*self == self.align()
}
/// Return if the LSN is valid
/// mimics postgres XLogRecPtrIsInvalid macro
pub fn is_valid(self) -> bool {
self != Lsn::INVALID
}
}
impl From<u64> for Lsn {

View File

@@ -218,7 +218,7 @@ impl ZTenantTimelineId {
impl fmt::Display for ZTenantTimelineId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}-{}", self.tenant_id, self.timeline_id)
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
}
}
@@ -226,9 +226,9 @@ impl fmt::Display for ZTenantTimelineId {
// by the console.
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ZNodeId(pub u64);
pub struct NodeId(pub u64);
impl fmt::Display for ZNodeId {
impl fmt::Display for NodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}

View File

@@ -22,14 +22,14 @@ use utils::{
lsn::Lsn,
postgres_backend::AuthType,
project_git_version,
zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
};
use pageserver::timelines::TimelineInfo;
// Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1);
const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1);
const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
@@ -860,7 +860,7 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
Ok(())
}
fn get_safekeeper(env: &local_env::LocalEnv, id: ZNodeId) -> Result<SafekeeperNode> {
fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
Ok(SafekeeperNode::from_env(env, node))
} else {
@@ -876,7 +876,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
// All the commands take an optional safekeeper name argument
let sk_id = if let Some(id_str) = sub_args.value_of("id") {
ZNodeId(id_str.parse().context("while parsing safekeeper id")?)
NodeId(id_str.parse().context("while parsing safekeeper id")?)
} else {
DEFAULT_SAFEKEEPER_ID
};

View File

@@ -10,8 +10,9 @@
//! This module is responsible for creation of such tarball
//! from data stored in object storage.
//!
use anyhow::{anyhow, ensure, Context, Result};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{BufMut, BytesMut};
use fail::fail_point;
use std::fmt::Write as FmtWrite;
use std::io;
use std::io::Write;
@@ -30,11 +31,16 @@ use utils::lsn::Lsn;
/// This is short-living object only for the time of tarball creation,
/// created mostly to avoid passing a lot of parameters between various functions
/// used for constructing tarball.
pub struct Basebackup<'a> {
ar: Builder<&'a mut dyn Write>,
pub struct Basebackup<'a, W>
where
W: Write,
{
ar: Builder<AbortableWrite<W>>,
timeline: &'a Arc<DatadirTimelineImpl>,
pub lsn: Lsn,
prev_record_lsn: Lsn,
finished: bool,
}
// Create basebackup with non-rel data in it. Omit relational data.
@@ -44,12 +50,15 @@ pub struct Basebackup<'a> {
// * When working without safekeepers. In this situation it is important to match the lsn
// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
// to start the replication.
impl<'a> Basebackup<'a> {
impl<'a, W> Basebackup<'a, W>
where
W: Write,
{
pub fn new(
write: &'a mut dyn Write,
write: W,
timeline: &'a Arc<DatadirTimelineImpl>,
req_lsn: Option<Lsn>,
) -> Result<Basebackup<'a>> {
) -> Result<Basebackup<'a, W>> {
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
// previous record (xl_prev). We include prev_record_lsn in the
@@ -90,14 +99,15 @@ impl<'a> Basebackup<'a> {
);
Ok(Basebackup {
ar: Builder::new(write),
ar: Builder::new(AbortableWrite::new(write)),
timeline,
lsn: backup_lsn,
prev_record_lsn: backup_prev,
finished: false,
})
}
pub fn send_tarball(&mut self) -> anyhow::Result<()> {
pub fn send_tarball(mut self) -> anyhow::Result<()> {
// Create pgdata subdirs structure
for dir in pg_constants::PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(*dir)?;
@@ -135,9 +145,14 @@ impl<'a> Basebackup<'a> {
self.add_twophase_file(xid)?;
}
fail_point!("basebackup-before-control-file", |_| {
bail!("failpoint basebackup-before-control-file")
});
// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file()?;
self.ar.finish()?;
self.finished = true;
debug!("all tarred up!");
Ok(())
}
@@ -331,6 +346,19 @@ impl<'a> Basebackup<'a> {
}
}
impl<'a, W> Drop for Basebackup<'a, W>
where
W: Write,
{
/// If the basebackup was not finished, prevent the Archive::drop() from
/// writing the end-of-archive marker.
fn drop(&mut self) {
if !self.finished {
self.ar.get_mut().abort();
}
}
}
//
// Create new tarball entry header
//
@@ -366,3 +394,49 @@ fn new_tar_header_dir(path: &str) -> anyhow::Result<Header> {
header.set_cksum();
Ok(header)
}
/// A wrapper that passes through all data to the underlying Write,
/// until abort() is called.
///
/// tar::Builder has an annoying habit of finishing the archive with
/// a valid tar end-of-archive marker (two 512-byte sectors of zeros),
/// even if an error occurs and we don't finish building the archive.
/// We'd rather abort writing the tarball immediately than construct
/// a seemingly valid but incomplete archive. This wrapper allows us
/// to swallow the end-of-archive marker that Builder::drop() emits,
/// without writing it to the underlying sink.
///
struct AbortableWrite<W> {
w: W,
aborted: bool,
}
impl<W> AbortableWrite<W> {
pub fn new(w: W) -> Self {
AbortableWrite { w, aborted: false }
}
pub fn abort(&mut self) {
self.aborted = true;
}
}
impl<W> Write for AbortableWrite<W>
where
W: Write,
{
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
if self.aborted {
Ok(data.len())
} else {
self.w.write(data)
}
}
fn flush(&mut self) -> io::Result<()> {
if self.aborted {
Ok(())
} else {
self.w.flush()
}
}
}

View File

@@ -5,9 +5,9 @@
//! See also `settings.md` for better description on every parameter.
use anyhow::{anyhow, bail, ensure, Context, Result};
use remote_storage::{RemoteStorageConfig, RemoteStorageKind, S3Config};
use remote_storage::RemoteStorageConfig;
use std::env;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
@@ -16,7 +16,7 @@ use toml_edit::{Document, Item};
use url::Url;
use utils::{
postgres_backend::AuthType,
zid::{ZNodeId, ZTenantId, ZTimelineId},
zid::{NodeId, ZTenantId, ZTimelineId},
};
use crate::layered_repository::TIMELINES_SEGMENT_NAME;
@@ -78,7 +78,7 @@ pub mod defaults {
pub struct PageServerConf {
// Identifier of that particular pageserver so e g safekeepers
// can safely distinguish different pageservers
pub id: ZNodeId,
pub id: NodeId,
/// Example (default): 127.0.0.1:64000
pub listen_pg_addr: String,
@@ -180,7 +180,7 @@ struct PageServerConfigBuilder {
auth_validation_public_key_path: BuilderValue<Option<PathBuf>>,
remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
id: BuilderValue<ZNodeId>,
id: BuilderValue<NodeId>,
profiling: BuilderValue<ProfilingConfig>,
broker_etcd_prefix: BuilderValue<String>,
@@ -276,7 +276,7 @@ impl PageServerConfigBuilder {
self.broker_etcd_prefix = BuilderValue::Set(broker_etcd_prefix)
}
pub fn id(&mut self, node_id: ZNodeId) {
pub fn id(&mut self, node_id: NodeId) {
self.id = BuilderValue::Set(node_id)
}
@@ -394,12 +394,12 @@ impl PageServerConf {
)),
"auth_type" => builder.auth_type(parse_toml_from_str(key, item)?),
"remote_storage" => {
builder.remote_storage_config(Some(Self::parse_remote_storage_config(item)?))
builder.remote_storage_config(Some(RemoteStorageConfig::from_toml(item)?))
}
"tenant_config" => {
t_conf = Self::parse_toml_tenant_conf(item)?;
}
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
"id" => builder.id(NodeId(parse_toml_u64(key, item)?)),
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
"broker_etcd_prefix" => builder.broker_etcd_prefix(parse_toml_string(key, item)?),
"broker_endpoints" => builder.broker_endpoints(
@@ -484,64 +484,6 @@ impl PageServerConf {
Ok(t_conf)
}
/// subroutine of parse_config(), to parse the `[remote_storage]` table.
fn parse_remote_storage_config(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
let local_path = toml.get("local_path");
let bucket_name = toml.get("bucket_name");
let bucket_region = toml.get("bucket_region");
let max_concurrent_syncs = NonZeroUsize::new(
parse_optional_integer("max_concurrent_syncs", toml)?
.unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS),
)
.context("Failed to parse 'max_concurrent_syncs' as a positive integer")?;
let max_sync_errors = NonZeroU32::new(
parse_optional_integer("max_sync_errors", toml)?
.unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS),
)
.context("Failed to parse 'max_sync_errors' as a positive integer")?;
let concurrency_limit = NonZeroUsize::new(
parse_optional_integer("concurrency_limit", toml)?
.unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT),
)
.context("Failed to parse 'concurrency_limit' as a positive integer")?;
let storage = match (local_path, bucket_name, bucket_region) {
(None, None, None) => bail!("no 'local_path' nor 'bucket_name' option"),
(_, Some(_), None) => {
bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
}
(_, None, Some(_)) => {
bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
}
(None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config {
bucket_name: parse_toml_string("bucket_name", bucket_name)?,
bucket_region: parse_toml_string("bucket_region", bucket_region)?,
prefix_in_bucket: toml
.get("prefix_in_bucket")
.map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket))
.transpose()?,
endpoint: toml
.get("endpoint")
.map(|endpoint| parse_toml_string("endpoint", endpoint))
.transpose()?,
concurrency_limit,
}),
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
parse_toml_string("local_path", local_path)?,
)),
(Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"),
};
Ok(RemoteStorageConfig {
max_concurrent_syncs,
max_sync_errors,
storage,
})
}
#[cfg(test)]
pub fn test_repo_dir(test_name: &str) -> PathBuf {
PathBuf::from(format!("../tmp_check/test_{test_name}"))
@@ -550,7 +492,7 @@ impl PageServerConf {
#[cfg(test)]
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
PageServerConf {
id: ZNodeId(0),
id: NodeId(0),
wait_lsn_timeout: Duration::from_secs(60),
wal_redo_timeout: Duration::from_secs(60),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
@@ -592,23 +534,6 @@ fn parse_toml_u64(name: &str, item: &Item) -> Result<u64> {
Ok(i as u64)
}
fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
where
I: TryFrom<i64, Error = E>,
E: std::error::Error + Send + Sync + 'static,
{
let toml_integer = match item.get(name) {
Some(item) => item
.as_integer()
.with_context(|| format!("configure option {name} is not an integer"))?,
None => return Ok(None),
};
I::try_from(toml_integer)
.map(Some)
.with_context(|| format!("configure option {name} is too large"))
}
fn parse_toml_duration(name: &str, item: &Item) -> Result<Duration> {
let s = item
.as_str()
@@ -651,8 +576,12 @@ fn parse_toml_array(name: &str, item: &Item) -> anyhow::Result<Vec<String>> {
#[cfg(test)]
mod tests {
use std::fs;
use std::{
fs,
num::{NonZeroU32, NonZeroUsize},
};
use remote_storage::{RemoteStorageKind, S3Config};
use tempfile::{tempdir, TempDir};
use super::*;
@@ -693,7 +622,7 @@ id = 10
assert_eq!(
parsed_config,
PageServerConf {
id: ZNodeId(10),
id: NodeId(10),
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
@@ -737,7 +666,7 @@ id = 10
assert_eq!(
parsed_config,
PageServerConf {
id: ZNodeId(10),
id: NodeId(10),
listen_pg_addr: "127.0.0.1:64000".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(),
wait_lsn_timeout: Duration::from_secs(111),

View File

@@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
lsn::Lsn,
zid::{ZNodeId, ZTenantId, ZTimelineId},
zid::{NodeId, ZTenantId, ZTimelineId},
};
#[serde_as]
@@ -42,7 +42,7 @@ pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub ZTenantId
#[derive(Serialize)]
pub struct StatusResponse {
pub id: ZNodeId,
pub id: NodeId,
}
impl TenantCreateRequest {

View File

@@ -1230,7 +1230,7 @@ impl LayeredTimeline {
}),
disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn().0),
last_freeze_at: AtomicLsn::new(0),
last_freeze_at: AtomicLsn::new(metadata.disk_consistent_lsn().0),
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn(),

View File

@@ -305,7 +305,29 @@ fn page_service_conn_main(
let mut conn_handler = PageServerHandler::new(conf, auth);
let pgbackend = PostgresBackend::new(socket, auth_type, None, true)?;
pgbackend.run(&mut conn_handler)
match pgbackend.run(&mut conn_handler) {
Ok(()) => {
// we've been requested to shut down
Ok(())
}
Err(err) => {
let root_cause_io_err_kind = err
.root_cause()
.downcast_ref::<io::Error>()
.map(|e| e.kind());
// `ConnectionReset` error happens when the Postgres client closes the connection.
// As this disconnection happens quite often and is expected,
// we decided to downgrade the logging level to `INFO`.
// See: https://github.com/neondatabase/neon/issues/1683.
if root_cause_io_err_kind == Some(io::ErrorKind::ConnectionReset) {
info!("Postgres client disconnected");
Ok(())
} else {
Err(err)
}
}
}
}
#[derive(Debug)]
@@ -593,7 +615,8 @@ impl PageServerHandler {
/* Send a tarball of the latest layer on the timeline */
{
let mut writer = CopyDataSink { pgb };
let mut basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?;
let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?;
span.record("lsn", &basebackup.lsn.to_string().as_str());
basebackup.send_tarball()?;
}

View File

@@ -327,8 +327,8 @@ pub fn get_local_timeline_with_load(
return Ok(Arc::clone(page_tline));
}
let page_tline = new_local_timeline(&tenant.repo, timeline_id)
.with_context(|| format!("Failed to create new local timeline for tenant {tenant_id}"))?;
let page_tline = load_local_timeline(&tenant.repo, timeline_id)
.with_context(|| format!("Failed to load local timeline for tenant {tenant_id}"))?;
tenant
.local_timelines
.insert(timeline_id, Arc::clone(&page_tline));
@@ -365,7 +365,7 @@ pub fn detach_timeline(
Ok(())
}
fn new_local_timeline(
fn load_local_timeline(
repo: &RepositoryImpl,
timeline_id: ZTimelineId,
) -> anyhow::Result<Arc<DatadirTimeline<LayeredRepository>>> {
@@ -458,8 +458,8 @@ fn apply_timeline_remote_sync_status_updates(
bail!("Local timeline {timeline_id} already registered")
}
Entry::Vacant(v) => {
v.insert(new_local_timeline(repo, timeline_id).with_context(|| {
format!("Failed to register new local timeline for tenant {tenant_id}")
v.insert(load_local_timeline(repo, timeline_id).with_context(|| {
format!("Failed to register add local timeline for tenant {tenant_id}")
})?);
}
},

View File

@@ -302,8 +302,8 @@ fn bootstrap_timeline<R: Repository>(
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &mut page_tline, lsn)?;
page_tline.tline.checkpoint(CheckpointConfig::Forced)?;
println!(
"created initial timeline {} timeline.lsn {}",
info!(
"created root timeline {} timeline.lsn {}",
tli,
page_tline.tline.get_last_record_lsn()
);

View File

@@ -5,12 +5,9 @@ use utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage};
fn hello_message(redirect_uri: &str, session_id: &str) -> String {
format!(
concat![
"☀️ Welcome to Neon!\n",
"To proceed with database creation, open the following link:\n\n",
"Welcome to Neon!\n",
"Authenticate by visiting:\n",
" {redirect_uri}{session_id}\n\n",
"It needs to be done once and we will send you '.pgpass' file,\n",
"which will allow you to access or create ",
"databases without opening your web browser."
],
redirect_uri = redirect_uri,
session_id = session_id,

View File

@@ -61,7 +61,8 @@ pub fn configure_tls(key_path: &str, cert_path: &str) -> anyhow::Result<TlsConfi
let config = rustls::ServerConfig::builder()
.with_safe_default_cipher_suites()
.with_safe_default_kx_groups()
.with_protocol_versions(&[&rustls::version::TLS13])?
// allow TLS 1.2 to be compatible with older client libraries
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
.with_no_client_auth()
.with_single_cert(cert_chain, key)?;

View File

@@ -30,6 +30,10 @@ const_format = "0.2.21"
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-util = { version = "0.7", features = ["io"] }
git-version = "0.3.5"
async-trait = "0.1"
once_cell = "1.10.0"
futures = "0.3.13"
toml_edit = { version = "0.13", features = ["easy"] }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }

View File

@@ -6,25 +6,30 @@ use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use fs2::FileExt;
use remote_storage::RemoteStorageConfig;
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::thread;
use tokio::sync::mpsc;
use toml_edit::Document;
use tracing::*;
use url::{ParseError, Url};
use safekeeper::control_file::{self};
use safekeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
use safekeeper::http;
use safekeeper::remove_wal;
use safekeeper::timeline::GlobalTimelines;
use safekeeper::wal_backup;
use safekeeper::wal_service;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, callmemaybe};
use safekeeper::{http, s3_offload};
use utils::{
http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener,
zid::ZNodeId,
zid::NodeId,
};
const LOCK_FILE_NAME: &str = "safekeeper.lock";
@@ -71,12 +76,6 @@ fn main() -> anyhow::Result<()> {
.long("pageserver")
.takes_value(true),
)
.arg(
Arg::new("ttl")
.long("ttl")
.takes_value(true)
.help("interval for keeping WAL at safekeeper node, after which them will be uploaded to S3 and removed locally"),
)
.arg(
Arg::new("recall")
.long("recall")
@@ -118,12 +117,20 @@ fn main() -> anyhow::Result<()> {
.help("a prefix to always use when polling/pusing data in etcd from this safekeeper"),
)
.arg(
Arg::new("enable-s3-offload")
.long("enable-s3-offload")
Arg::new("wal-backup-threads").long("backup-threads").takes_value(true).help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")),
).arg(
Arg::new("remote-storage")
.long("remote-storage")
.takes_value(true)
.help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"<BUCKETNAME>\", \"bucket_region\":\"<REGION>\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring structure on the file system.")
)
.arg(
Arg::new("enable-wal-backup")
.long("enable-wal-backup")
.takes_value(true)
.default_value("true")
.default_missing_value("true")
.help("Enable/disable s3 offloading. When disabled, safekeeper removes WAL ignoring s3 WAL horizon."),
.help("Enable/disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring WAL backup horizon."),
)
.get_matches();
@@ -157,17 +164,13 @@ fn main() -> anyhow::Result<()> {
conf.listen_http_addr = addr.to_owned();
}
if let Some(ttl) = arg_matches.value_of("ttl") {
conf.ttl = Some(humantime::parse_duration(ttl)?);
}
if let Some(recall) = arg_matches.value_of("recall") {
conf.recall_period = humantime::parse_duration(recall)?;
}
let mut given_id = None;
if let Some(given_id_str) = arg_matches.value_of("id") {
given_id = Some(ZNodeId(
given_id = Some(NodeId(
given_id_str
.parse()
.context("failed to parse safekeeper id")?,
@@ -182,9 +185,21 @@ fn main() -> anyhow::Result<()> {
conf.broker_etcd_prefix = prefix.to_string();
}
if let Some(backup_threads) = arg_matches.value_of("wal-backup-threads") {
conf.backup_runtime_threads = backup_threads
.parse()
.with_context(|| format!("Failed to parse backup threads {}", backup_threads))?;
}
if let Some(storage_conf) = arg_matches.value_of("remote-storage") {
// funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse
let storage_conf_toml = format!("remote_storage = {}", storage_conf);
let parsed_toml = storage_conf_toml.parse::<Document>()?; // parse
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?);
}
// Seems like there is no better way to accept bool values explicitly in clap.
conf.s3_offload_enabled = arg_matches
.value_of("enable-s3-offload")
conf.wal_backup_enabled = arg_matches
.value_of("enable-wal-backup")
.unwrap()
.parse()
.context("failed to parse bool enable-s3-offload bool")?;
@@ -192,7 +207,7 @@ fn main() -> anyhow::Result<()> {
start_safekeeper(conf, given_id, arg_matches.is_present("init"))
}
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: bool) -> Result<()> {
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bool) -> Result<()> {
let log_file = logging::init("safekeeper.log", conf.daemonize)?;
info!("version: {GIT_VERSION}");
@@ -252,7 +267,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel();
GlobalTimelines::set_callmemaybe_tx(callmemaybe_tx);
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
GlobalTimelines::init(callmemaybe_tx, wal_backup_launcher_tx);
let conf_ = conf.clone();
threads.push(
@@ -270,17 +286,6 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
})?,
);
if conf.ttl.is_some() {
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("S3 offload thread".into())
.spawn(|| {
s3_offload::thread_main(conf_);
})?,
);
}
let conf_cloned = conf.clone();
let safekeeper_thread = thread::Builder::new()
.name("Safekeeper thread".into())
@@ -330,6 +335,15 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
})?,
);
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("wal backup launcher thread".into())
.spawn(move || {
wal_backup::wal_backup_launcher_thread_main(conf_, wal_backup_launcher_rx);
})?,
);
// TODO: put more thoughts into handling of failed threads
// We probably should restart them.
@@ -345,14 +359,14 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
}
/// Determine safekeeper id and set it in config.
fn set_id(conf: &mut SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
fn set_id(conf: &mut SafeKeeperConf, given_id: Option<NodeId>) -> Result<()> {
let id_file_path = conf.workdir.join(ID_FILE_NAME);
let my_id: ZNodeId;
let my_id: NodeId;
// If ID exists, read it in; otherwise set one passed
match fs::read(&id_file_path) {
Ok(id_serialized) => {
my_id = ZNodeId(
my_id = NodeId(
std::str::from_utf8(&id_serialized)
.context("failed to parse safekeeper id")?
.parse()

View File

@@ -1,5 +1,6 @@
//! Communication with etcd, providing safekeeper peers and pageserver coordination.
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
@@ -7,12 +8,14 @@ use etcd_broker::Client;
use etcd_broker::PutOptions;
use etcd_broker::SkTimelineSubscriptionKind;
use std::time::Duration;
use tokio::spawn;
use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use url::Url;
use crate::{timeline::GlobalTimelines, SafeKeeperConf};
use utils::zid::{ZNodeId, ZTenantTimelineId};
use utils::zid::{NodeId, ZTenantTimelineId};
const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000;
@@ -36,7 +39,7 @@ pub fn thread_main(conf: SafeKeeperConf) {
fn timeline_safekeeper_path(
broker_etcd_prefix: String,
zttid: ZTenantTimelineId,
sk_id: ZNodeId,
sk_id: NodeId,
) -> String {
format!(
"{}/{sk_id}",
@@ -44,6 +47,118 @@ fn timeline_safekeeper_path(
)
}
pub struct Election {
pub election_name: String,
pub candidate_name: String,
pub broker_endpoints: Vec<Url>,
}
impl Election {
pub fn new(election_name: String, candidate_name: String, broker_endpoints: Vec<Url>) -> Self {
Self {
election_name,
candidate_name,
broker_endpoints,
}
}
}
pub struct ElectionLeader {
client: Client,
keep_alive: JoinHandle<Result<()>>,
}
impl ElectionLeader {
pub async fn check_am_i(
&mut self,
election_name: String,
candidate_name: String,
) -> Result<bool> {
let resp = self.client.leader(election_name).await?;
let kv = resp.kv().ok_or(anyhow!("failed to get leader response"))?;
let leader = kv.value_str()?;
Ok(leader == candidate_name)
}
pub async fn give_up(self) {
self.keep_alive.abort();
// TODO: it'll be wise to resign here but it'll happen after lease expiration anyway
// should we await for keep alive termination?
let _ = self.keep_alive.await;
}
}
pub async fn get_leader(req: &Election) -> Result<ElectionLeader> {
let mut client = Client::connect(req.broker_endpoints.clone(), None)
.await
.context("Could not connect to etcd")?;
let lease = client
.lease_grant(LEASE_TTL_SEC, None)
.await
.context("Could not acquire a lease");
let lease_id = lease.map(|l| l.id()).unwrap();
let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id));
if let Err(e) = client
.campaign(
req.election_name.clone(),
req.candidate_name.clone(),
lease_id,
)
.await
{
keep_alive.abort();
let _ = keep_alive.await;
return Err(e.into());
}
Ok(ElectionLeader { client, keep_alive })
}
async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> {
let (mut keeper, mut ka_stream) = client
.lease_keep_alive(lease_id)
.await
.context("failed to create keepalive stream")?;
loop {
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
sleep(push_interval).await;
}
}
pub fn get_campaign_name(
election_name: String,
broker_prefix: String,
timeline_id: &ZTenantTimelineId,
) -> String {
return format!(
"{}/{}",
SkTimelineSubscriptionKind::timeline(broker_prefix, *timeline_id).watch_key(),
election_name
);
}
pub fn get_candiate_name(system_id: NodeId) -> String {
format!("id_{}", system_id)
}
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let mut client = Client::connect(&conf.broker_endpoints, None).await?;
@@ -59,7 +174,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
for zttid in GlobalTimelines::get_active_timelines() {
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
let sk_info = tli.get_public_info(&conf)?;
let put_opts = PutOptions::new().with_lease(lease.id());
client
@@ -106,12 +221,13 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
for (safekeeper_id, info) in sk_info {
tli.record_safekeeper_info(&info, safekeeper_id)?
tli.record_safekeeper_info(&info, safekeeper_id).await?
}
}
}
}
None => {
// XXX it means we lost connection with etcd, error is consumed inside sub object
debug!("timeline updates sender closed, aborting the pull loop");
return Ok(());
}
@@ -142,11 +258,12 @@ async fn main_loop(conf: SafeKeeperConf) {
},
res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
// was it panic or normal error?
let err = match res {
Ok(res_internal) => res_internal.unwrap_err(),
Err(err_outer) => err_outer.into(),
match res {
Ok(res_internal) => if let Err(err_inner) = res_internal {
warn!("pull task failed: {:?}", err_inner);
}
Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
};
warn!("pull task failed: {:?}", err);
pull_handle = None;
},
_ = ticker.tick() => {

View File

@@ -165,7 +165,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: oldstate.commit_lsn,
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
@@ -188,7 +188,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: oldstate.commit_lsn,
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
@@ -211,7 +211,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: oldstate.commit_lsn,
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
@@ -234,7 +234,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: oldstate.commit_lsn,
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn::INVALID,
peer_horizon_lsn: oldstate.peer_horizon_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),

View File

@@ -1,9 +1,9 @@
use serde::{Deserialize, Serialize};
use utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
use utils::zid::{NodeId, ZTenantId, ZTimelineId};
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
pub tenant_id: ZTenantId,
pub timeline_id: ZTimelineId,
pub peer_ids: Vec<ZNodeId>,
pub peer_ids: Vec<NodeId>,
}

View File

@@ -20,14 +20,14 @@ use utils::{
RequestExt, RouterBuilder,
},
lsn::Lsn,
zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
};
use super::models::TimelineCreateRequest;
#[derive(Debug, Serialize)]
struct SafekeeperStatus {
id: ZNodeId,
id: NodeId,
}
/// Healthcheck handler.
@@ -70,19 +70,19 @@ struct TimelineStatus {
timeline_id: ZTimelineId,
acceptor_state: AcceptorStateStatus,
#[serde(serialize_with = "display_serialize")]
flush_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
timeline_start_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
local_start_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
commit_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
s3_wal_lsn: Lsn,
backup_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
peer_horizon_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
remote_consistent_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
flush_lsn: Lsn,
}
/// Report info about timeline.
@@ -107,13 +107,13 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
tenant_id: zttid.tenant_id,
timeline_id: zttid.timeline_id,
acceptor_state: acc_state,
flush_lsn,
timeline_start_lsn: state.timeline_start_lsn,
local_start_lsn: state.local_start_lsn,
commit_lsn: inmem.commit_lsn,
s3_wal_lsn: inmem.s3_wal_lsn,
backup_lsn: inmem.backup_lsn,
peer_horizon_lsn: inmem.peer_horizon_lsn,
remote_consistent_lsn: inmem.remote_consistent_lsn,
flush_lsn,
};
json_response(StatusCode::OK, status)
}
@@ -148,7 +148,9 @@ async fn timeline_delete_force_handler(
ensure_no_body(&mut request).await?;
json_response(
StatusCode::OK,
GlobalTimelines::delete_force(get_conf(&request), &zttid).map_err(ApiError::from_err)?,
GlobalTimelines::delete_force(get_conf(&request), &zttid)
.await
.map_err(ApiError::from_err)?,
)
}
@@ -162,6 +164,7 @@ async fn tenant_delete_force_handler(
json_response(
StatusCode::OK,
GlobalTimelines::delete_force_all_for_tenant(get_conf(&request), &tenant_id)
.await
.map_err(ApiError::from_err)?
.iter()
.map(|(zttid, resp)| (format!("{}", zttid.timeline_id), *resp))
@@ -178,7 +181,8 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
let safekeeper_info: SkTimelineInfo = json_request(&mut request).await?;
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
tli.record_safekeeper_info(&safekeeper_info, ZNodeId(1))?;
tli.record_safekeeper_info(&safekeeper_info, NodeId(1))
.await?;
json_response(StatusCode::OK, ())
}

View File

@@ -1,9 +1,11 @@
use defaults::DEFAULT_WAL_BACKUP_RUNTIME_THREADS;
//
use remote_storage::RemoteStorageConfig;
use std::path::PathBuf;
use std::time::Duration;
use url::Url;
use utils::zid::{ZNodeId, ZTenantId, ZTenantTimelineId};
use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId};
pub mod broker;
pub mod callmemaybe;
@@ -14,10 +16,10 @@ pub mod http;
pub mod json_ctrl;
pub mod receive_wal;
pub mod remove_wal;
pub mod s3_offload;
pub mod safekeeper;
pub mod send_wal;
pub mod timeline;
pub mod wal_backup;
pub mod wal_service;
pub mod wal_storage;
@@ -31,6 +33,7 @@ pub mod defaults {
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
}
#[derive(Debug, Clone)]
@@ -47,12 +50,13 @@ pub struct SafeKeeperConf {
pub no_sync: bool,
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub ttl: Option<Duration>,
pub recall_period: Duration,
pub my_id: ZNodeId,
pub remote_storage: Option<RemoteStorageConfig>,
pub backup_runtime_threads: usize,
pub wal_backup_enabled: bool,
pub my_id: NodeId,
pub broker_endpoints: Vec<Url>,
pub broker_etcd_prefix: String,
pub s3_offload_enabled: bool,
}
impl SafeKeeperConf {
@@ -77,12 +81,13 @@ impl Default for SafeKeeperConf {
no_sync: false,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
ttl: None,
remote_storage: None,
recall_period: defaults::DEFAULT_RECALL_PERIOD,
my_id: ZNodeId(0),
my_id: NodeId(0),
broker_endpoints: Vec::new(),
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
s3_offload_enabled: true,
backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
wal_backup_enabled: true,
}
}
}

View File

@@ -85,16 +85,10 @@ impl<'pg> ReceiveWalConn<'pg> {
_ => bail!("unexpected message {:?} instead of greeting", next_msg),
}
// Register the connection and defer unregister.
spg.timeline
.get()
.on_compute_connect(self.pageserver_connstr.as_ref())?;
let _guard = ComputeConnectionGuard {
timeline: Arc::clone(spg.timeline.get()),
};
let mut next_msg = Some(next_msg);
let mut first_time_through = true;
let mut _guard: Option<ComputeConnectionGuard> = None;
loop {
if matches!(next_msg, Some(ProposerAcceptorMessage::AppendRequest(_))) {
// poll AppendRequest's without blocking and write WAL to disk without flushing,
@@ -122,6 +116,18 @@ impl<'pg> ReceiveWalConn<'pg> {
self.write_msg(&reply)?;
}
}
if first_time_through {
// Register the connection and defer unregister. Do that only
// after processing first message, as it sets wal_seg_size,
// wanted by many.
spg.timeline
.get()
.on_compute_connect(self.pageserver_connstr.as_ref())?;
_guard = Some(ComputeConnectionGuard {
timeline: Arc::clone(spg.timeline.get()),
});
first_time_through = false;
}
// blocking wait for the next message
if next_msg.is_none() {

View File

@@ -12,7 +12,7 @@ pub fn thread_main(conf: SafeKeeperConf) {
let active_tlis = GlobalTimelines::get_active_timelines();
for zttid in &active_tlis {
if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) {
if let Err(e) = tli.remove_old_wal(conf.s3_offload_enabled) {
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) {
warn!(
"failed to remove WAL for tenant {} timeline {}: {}",
tli.zttid.tenant_id, tli.zttid.timeline_id, e

View File

@@ -1,107 +0,0 @@
//
// Offload old WAL segments to S3 and remove them locally
// Needs `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables to be set
// if no IAM bucket access is used.
//
use anyhow::{bail, Context};
use postgres_ffi::xlog_utils::*;
use remote_storage::{
GenericRemoteStorage, RemoteStorage, RemoteStorageConfig, S3Bucket, S3Config, S3ObjectKey,
};
use std::collections::HashSet;
use std::env;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::Path;
use std::time::SystemTime;
use tokio::fs::{self, File};
use tokio::io::BufReader;
use tokio::runtime;
use tokio::time::sleep;
use tracing::*;
use walkdir::WalkDir;
use crate::SafeKeeperConf;
pub fn thread_main(conf: SafeKeeperConf) {
// Create a new thread pool
//
// FIXME: keep it single-threaded for now, make it easier to debug with gdb,
// and we're not concerned with performance yet.
//let runtime = runtime::Runtime::new().unwrap();
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
info!("Starting S3 offload task");
runtime.block_on(async {
main_loop(&conf).await.unwrap();
});
}
async fn offload_files(
remote_storage: &S3Bucket,
listing: &HashSet<S3ObjectKey>,
dir_path: &Path,
conf: &SafeKeeperConf,
) -> anyhow::Result<u64> {
let horizon = SystemTime::now() - conf.ttl.unwrap();
let mut n: u64 = 0;
for entry in WalkDir::new(dir_path) {
let entry = entry?;
let path = entry.path();
if path.is_file()
&& IsXLogFileName(entry.file_name().to_str().unwrap())
&& entry.metadata().unwrap().created().unwrap() <= horizon
{
let remote_path = remote_storage.remote_object_id(path)?;
if !listing.contains(&remote_path) {
let file = File::open(&path).await?;
let file_length = file.metadata().await?.len() as usize;
remote_storage
.upload(BufReader::new(file), file_length, &remote_path, None)
.await?;
fs::remove_file(&path).await?;
n += 1;
}
}
}
Ok(n)
}
async fn main_loop(conf: &SafeKeeperConf) -> anyhow::Result<()> {
let remote_storage = match GenericRemoteStorage::new(
conf.workdir.clone(),
&RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(10).unwrap(),
max_sync_errors: NonZeroU32::new(1).unwrap(),
storage: remote_storage::RemoteStorageKind::AwsS3(S3Config {
bucket_name: "zenith-testbucket".to_string(),
bucket_region: env::var("S3_REGION").context("S3_REGION env var is not set")?,
prefix_in_bucket: Some("walarchive/".to_string()),
endpoint: Some(env::var("S3_ENDPOINT").context("S3_ENDPOINT env var is not set")?),
concurrency_limit: NonZeroUsize::new(20).unwrap(),
}),
},
)? {
GenericRemoteStorage::Local(_) => {
bail!("Unexpected: got local storage for the remote config")
}
GenericRemoteStorage::S3(remote_storage) => remote_storage,
};
loop {
let listing = remote_storage
.list()
.await?
.into_iter()
.collect::<HashSet<_>>();
let n = offload_files(&remote_storage, &listing, &conf.workdir, conf).await?;
info!("Offload {n} files to S3");
sleep(conf.ttl.unwrap()).await;
}
}

View File

@@ -19,6 +19,7 @@ use lazy_static::lazy_static;
use crate::control_file;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
use metrics::{register_gauge_vec, Gauge, GaugeVec};
use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
@@ -26,7 +27,7 @@ use utils::{
bin_ser::LeSer,
lsn::Lsn,
pq_proto::{SystemId, ZenithFeedback},
zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
@@ -141,7 +142,7 @@ pub struct ServerInfo {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
/// LSN up to which safekeeper offloaded WAL to s3.
s3_wal_lsn: Lsn,
backup_lsn: Lsn,
/// Term of the last entry.
term: Term,
/// LSN of the last record.
@@ -153,7 +154,7 @@ pub struct PeerInfo {
impl PeerInfo {
fn new() -> Self {
Self {
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn::INVALID,
term: INVALID_TERM,
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
@@ -164,7 +165,7 @@ impl PeerInfo {
// vector-based node id -> peer state map with very limited functionality we
// need/
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Peers(pub Vec<(ZNodeId, PeerInfo)>);
pub struct Peers(pub Vec<(NodeId, PeerInfo)>);
/// Persistent information stored on safekeeper node
/// On disk data is prefixed by magic and format version and followed by checksum.
@@ -193,9 +194,9 @@ pub struct SafeKeeperState {
/// Part of WAL acknowledged by quorum and available locally. Always points
/// to record boundary.
pub commit_lsn: Lsn,
/// First LSN not yet offloaded to s3. Useful to persist to avoid finding
/// out offloading progress on boot.
pub s3_wal_lsn: Lsn,
/// LSN that points to the end of the last backed up segment. Useful to
/// persist to avoid finding out offloading progress on boot.
pub backup_lsn: Lsn,
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone). Persisting it helps skipping
/// recovery in walproposer, generally we compute it from peers. In
@@ -217,14 +218,14 @@ pub struct SafeKeeperState {
// are not flushed yet.
pub struct SafekeeperMemState {
pub commit_lsn: Lsn,
pub s3_wal_lsn: Lsn, // TODO: keep only persistent version
pub backup_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
pub proposer_uuid: PgUuid,
}
impl SafeKeeperState {
pub fn new(zttid: &ZTenantTimelineId, peers: Vec<ZNodeId>) -> SafeKeeperState {
pub fn new(zttid: &ZTenantTimelineId, peers: Vec<NodeId>) -> SafeKeeperState {
SafeKeeperState {
tenant_id: zttid.tenant_id,
timeline_id: zttid.timeline_id,
@@ -241,7 +242,7 @@ impl SafeKeeperState {
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: Lsn(0),
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn::INVALID,
peer_horizon_lsn: Lsn(0),
remote_consistent_lsn: Lsn(0),
peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()),
@@ -277,7 +278,7 @@ pub struct ProposerGreeting {
#[derive(Debug, Serialize)]
pub struct AcceptorGreeting {
term: u64,
node_id: ZNodeId,
node_id: NodeId,
}
/// Vote request sent from proposer to safekeepers
@@ -531,7 +532,7 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
pub wal_store: WAL,
node_id: ZNodeId, // safekeeper's node id
node_id: NodeId, // safekeeper's node id
}
impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
@@ -544,7 +545,7 @@ where
ztli: ZTimelineId,
state: CTRL,
mut wal_store: WAL,
node_id: ZNodeId,
node_id: NodeId,
) -> Result<SafeKeeper<CTRL, WAL>> {
if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id {
bail!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id);
@@ -559,7 +560,7 @@ where
epoch_start_lsn: Lsn(0),
inmem: SafekeeperMemState {
commit_lsn: state.commit_lsn,
s3_wal_lsn: state.s3_wal_lsn,
backup_lsn: state.backup_lsn,
peer_horizon_lsn: state.peer_horizon_lsn,
remote_consistent_lsn: state.remote_consistent_lsn,
proposer_uuid: state.proposer_uuid,
@@ -575,13 +576,16 @@ where
self.state
.acceptor_state
.term_history
.up_to(self.wal_store.flush_lsn())
.up_to(self.flush_lsn())
}
pub fn get_epoch(&self) -> Term {
self.state
.acceptor_state
.get_epoch(self.wal_store.flush_lsn())
self.state.acceptor_state.get_epoch(self.flush_lsn())
}
/// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
fn flush_lsn(&self) -> Lsn {
max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
}
/// Process message from proposer and possibly form reply. Concurrent
@@ -649,7 +653,6 @@ where
self.state.persist(&state)?;
}
// pass wal_seg_size to read WAL and find flush_lsn
self.wal_store.init_storage(&self.state)?;
info!(
@@ -671,7 +674,7 @@ where
let mut resp = VoteResponse {
term: self.state.acceptor_state.term,
vote_given: false as u64,
flush_lsn: self.wal_store.flush_lsn(),
flush_lsn: self.flush_lsn(),
truncate_lsn: self.state.peer_horizon_lsn,
term_history: self.get_term_history(),
timeline_start_lsn: self.state.timeline_start_lsn,
@@ -703,7 +706,7 @@ where
fn append_response(&self) -> AppendResponse {
let ar = AppendResponse {
term: self.state.acceptor_state.term,
flush_lsn: self.wal_store.flush_lsn(),
flush_lsn: self.flush_lsn(),
commit_lsn: self.state.commit_lsn,
// will be filled by the upper code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
@@ -731,24 +734,36 @@ where
{
let mut state = self.state.clone();
// Remeber point where WAL begins globally, if not yet.
// Here we learn initial LSN for the first time, set fields
// interested in that.
if state.timeline_start_lsn == Lsn(0) {
// Remember point where WAL begins globally.
state.timeline_start_lsn = msg.timeline_start_lsn;
info!(
"setting timeline_start_lsn to {:?}",
state.timeline_start_lsn
);
}
// Remember point where WAL begins locally, if not yet. (I doubt the
// second condition is ever possible)
if state.local_start_lsn == Lsn(0) || state.local_start_lsn >= msg.start_streaming_at {
state.local_start_lsn = msg.start_streaming_at;
info!("setting local_start_lsn to {:?}", state.local_start_lsn);
}
// Initializing commit_lsn before acking first flushed record is
// important to let find_end_of_wal skip the whole in the beginning
// of the first segment.
//
// NB: on new clusters, this happens at the same time as
// timeline_start_lsn initialization, it is taken outside to provide
// upgrade.
self.global_commit_lsn = max(self.global_commit_lsn, state.timeline_start_lsn);
self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64);
// Initalizing backup_lsn is useful to avoid making backup think it should upload 0 segment.
self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
state.acceptor_state.term_history = msg.term_history.clone();
self.state.persist(&state)?;
self.persist_control_file(state)?;
}
info!("start receiving WAL since {:?}", msg.start_streaming_at);
@@ -758,7 +773,7 @@ where
/// Advance commit_lsn taking into account what we have locally
pub fn update_commit_lsn(&mut self) -> Result<()> {
let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn());
let commit_lsn = min(self.global_commit_lsn, self.flush_lsn());
assert!(commit_lsn >= self.inmem.commit_lsn);
self.inmem.commit_lsn = commit_lsn;
@@ -772,25 +787,16 @@ where
// that we receive new epoch_start_lsn, and we still need to sync
// control file in this case.
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
self.persist_control_file()?;
}
// We got our first commit_lsn, which means we should sync
// everything to disk, to initialize the state.
if self.state.commit_lsn == Lsn(0) && commit_lsn > Lsn(0) {
self.wal_store.flush_wal()?;
self.persist_control_file()?;
self.persist_control_file(self.state.clone())?;
}
Ok(())
}
/// Persist in-memory state to the disk.
fn persist_control_file(&mut self) -> Result<()> {
let mut state = self.state.clone();
/// Persist in-memory state to the disk, taking other data from state.
fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
state.commit_lsn = self.inmem.commit_lsn;
state.s3_wal_lsn = self.inmem.s3_wal_lsn;
state.backup_lsn = self.inmem.backup_lsn;
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
state.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
state.proposer_uuid = self.inmem.proposer_uuid;
@@ -823,13 +829,6 @@ where
// do the job
if !msg.wal_data.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
// If this was the first record we ever received, initialize
// commit_lsn to help find_end_of_wal skip the hole in the
// beginning.
if self.global_commit_lsn == Lsn(0) {
self.global_commit_lsn = msg.h.begin_lsn;
}
}
// flush wal to the disk, if required
@@ -852,7 +851,7 @@ where
if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
< self.inmem.peer_horizon_lsn
{
self.persist_control_file()?;
self.persist_control_file(self.state.clone())?;
}
trace!(
@@ -898,11 +897,11 @@ where
self.update_commit_lsn()?;
}
}
if let Some(s3_wal_lsn) = sk_info.s3_wal_lsn {
let new_s3_wal_lsn = max(s3_wal_lsn, self.inmem.s3_wal_lsn);
if let Some(backup_lsn) = sk_info.backup_lsn {
let new_backup_lsn = max(backup_lsn, self.inmem.backup_lsn);
sync_control_file |=
self.state.s3_wal_lsn + (self.state.server.wal_seg_size as u64) < new_s3_wal_lsn;
self.inmem.s3_wal_lsn = new_s3_wal_lsn;
self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
self.inmem.backup_lsn = new_backup_lsn;
}
if let Some(remote_consistent_lsn) = sk_info.remote_consistent_lsn {
let new_remote_consistent_lsn =
@@ -920,7 +919,7 @@ where
self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
}
if sync_control_file {
self.persist_control_file()?;
self.persist_control_file(self.state.clone())?;
}
Ok(())
}
@@ -930,29 +929,23 @@ where
/// offloading.
/// While it is safe to use inmem values for determining horizon,
/// we use persistent to make possible normal states less surprising.
pub fn get_horizon_segno(&self, s3_offload_enabled: bool) -> XLogSegNo {
let s3_offload_horizon = if s3_offload_enabled {
self.state.s3_wal_lsn
} else {
Lsn(u64::MAX)
};
let horizon_lsn = min(
min(
self.state.remote_consistent_lsn,
self.state.peer_horizon_lsn,
),
s3_offload_horizon,
pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
let mut horizon_lsn = min(
self.state.remote_consistent_lsn,
self.state.peer_horizon_lsn,
);
if wal_backup_enabled {
horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
}
horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
}
}
#[cfg(test)]
mod tests {
use std::ops::Deref;
use super::*;
use crate::wal_storage::Storage;
use std::ops::Deref;
// fake storage for tests
struct InMemoryState {
@@ -1013,7 +1006,8 @@ mod tests {
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, storage, wal_store, ZNodeId(0)).unwrap();
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
// check voting for 1 is ok
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
@@ -1028,7 +1022,8 @@ mod tests {
let storage = InMemoryState {
persisted_state: state,
};
sk = SafeKeeper::new(ztli, storage, sk.wal_store, ZNodeId(0)).unwrap();
sk = SafeKeeper::new(ztli, storage, sk.wal_store, NodeId(0)).unwrap();
// and ensure voting second time for 1 is not ok
vote_resp = sk.process_msg(&vote_request);
@@ -1045,7 +1040,8 @@ mod tests {
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, storage, wal_store, ZNodeId(0)).unwrap();
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
let mut ar_hdr = AppendRequestHeader {
term: 1,

View File

@@ -315,7 +315,7 @@ impl ReplicationConn {
} else {
// TODO: also check once in a while whether we are walsender
// to right pageserver.
if spg.timeline.get().check_deactivate(replica_id)? {
if spg.timeline.get().stop_walsender(replica_id)? {
// Shut down, timeline is suspended.
// TODO create proper error type for this
bail!("end streaming to {:?}", spg.appname);

View File

@@ -8,6 +8,7 @@ use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::XLogSegNo;
use serde::Serialize;
use tokio::sync::watch;
use std::cmp::{max, min};
use std::collections::HashMap;
@@ -15,23 +16,23 @@ use std::fs::{self};
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tracing::*;
use utils::{
lsn::Lsn,
pq_proto::ZenithFeedback,
zid::{ZNodeId, ZTenantId, ZTenantTimelineId},
zid::{NodeId, ZTenantId, ZTenantTimelineId},
};
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
use crate::control_file;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState,
};
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::SafeKeeperConf;
@@ -81,10 +82,14 @@ struct SharedState {
notified_commit_lsn: Lsn,
/// State of replicas
replicas: Vec<Option<ReplicaState>>,
/// Inactive clusters shouldn't occupy any resources, so timeline is
/// activated whenever there is a compute connection or pageserver is not
/// caughtup (it must have latest WAL for new compute start) and suspended
/// otherwise.
/// True when WAL backup launcher oversees the timeline, making sure WAL is
/// offloaded, allows to bother launcher less.
wal_backup_active: bool,
/// True whenever there is at least some pending activity on timeline: live
/// compute connection, pageserver is not caughtup (it must have latest WAL
/// for new compute start) or WAL backuping is not finished. Practically it
/// means safekeepers broadcast info to peers about the timeline, old WAL is
/// trimmed.
///
/// TODO: it might be better to remove tli completely from GlobalTimelines
/// when tli is inactive instead of having this flag.
@@ -99,10 +104,11 @@ impl SharedState {
fn create(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,
peer_ids: Vec<ZNodeId>,
peer_ids: Vec<NodeId>,
) -> Result<Self> {
let state = SafeKeeperState::new(zttid, peer_ids);
let control_store = control_file::FileStorage::create_new(zttid, conf, state)?;
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf);
let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?;
@@ -110,6 +116,7 @@ impl SharedState {
notified_commit_lsn: Lsn(0),
sk,
replicas: Vec::new(),
wal_backup_active: false,
active: false,
num_computes: 0,
pageserver_connstr: None,
@@ -129,15 +136,62 @@ impl SharedState {
notified_commit_lsn: Lsn(0),
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?,
replicas: Vec::new(),
wal_backup_active: false,
active: false,
num_computes: 0,
pageserver_connstr: None,
last_removed_segno: 0,
})
}
fn is_active(&self) -> bool {
self.is_wal_backup_required()
// FIXME: add tracking of relevant pageservers and check them here individually,
// otherwise migration won't work (we suspend too early).
|| self.sk.inmem.remote_consistent_lsn <= self.sk.inmem.commit_lsn
}
/// Activate the timeline: start/change walsender (via callmemaybe).
fn activate(
/// Mark timeline active/inactive and return whether s3 offloading requires
/// start/stop action.
fn update_status(&mut self) -> bool {
self.active = self.is_active();
self.is_wal_backup_action_pending()
}
/// Should we run s3 offloading in current state?
fn is_wal_backup_required(&self) -> bool {
let seg_size = self.get_wal_seg_size();
self.num_computes > 0 ||
// Currently only the whole segment is offloaded, so compare segment numbers.
(self.sk.inmem.commit_lsn.segment_number(seg_size) >
self.sk.inmem.backup_lsn.segment_number(seg_size))
}
/// Is current state of s3 offloading is not what it ought to be?
fn is_wal_backup_action_pending(&self) -> bool {
let res = self.wal_backup_active != self.is_wal_backup_required();
if res {
let action_pending = if self.is_wal_backup_required() {
"start"
} else {
"stop"
};
trace!(
"timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
self.sk.state.timeline_id, action_pending, self.num_computes, self.sk.inmem.commit_lsn, self.sk.inmem.backup_lsn
);
}
res
}
/// Returns whether s3 offloading is required and sets current status as
/// matching.
fn wal_backup_attend(&mut self) -> bool {
self.wal_backup_active = self.is_wal_backup_required();
self.wal_backup_active
}
/// start/change walsender (via callmemaybe).
fn callmemaybe_sub(
&mut self,
zttid: &ZTenantTimelineId,
pageserver_connstr: Option<&String>,
@@ -179,42 +233,42 @@ impl SharedState {
);
}
self.pageserver_connstr = pageserver_connstr.map(|c| c.to_owned());
self.active = true;
Ok(())
}
/// Deactivate the timeline: stop callmemaybe.
fn deactivate(
fn callmemaybe_unsub(
&mut self,
zttid: &ZTenantTimelineId,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<()> {
if self.active {
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
callmemaybe_tx
.send(CallmeEvent::Unsubscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Unsubscribe request to callmemaybe thread {}",
e
);
});
info!(
"timeline {} is unsubscribed from callmemaybe to {}",
zttid.timeline_id,
self.pageserver_connstr.as_ref().unwrap()
);
}
self.active = false;
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
callmemaybe_tx
.send(CallmeEvent::Unsubscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Unsubscribe request to callmemaybe thread {}",
e
);
});
info!(
"timeline {} is unsubscribed from callmemaybe to {}",
zttid.timeline_id,
self.pageserver_connstr.as_ref().unwrap()
);
}
Ok(())
}
fn get_wal_seg_size(&self) -> usize {
self.sk.state.server.wal_seg_size as usize
}
/// Get combined state of all alive replicas
pub fn get_replicas_state(&self) -> ReplicaState {
let mut acc = ReplicaState::new();
@@ -278,6 +332,13 @@ impl SharedState {
pub struct Timeline {
pub zttid: ZTenantTimelineId,
pub callmemaybe_tx: UnboundedSender<CallmeEvent>,
/// Sending here asks for wal backup launcher attention (start/stop
/// offloading). Sending zttid instead of concrete command allows to do
/// sending without timeline lock.
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
commit_lsn_watch_tx: watch::Sender<Lsn>,
/// For breeding receivers.
commit_lsn_watch_rx: watch::Receiver<Lsn>,
mutex: Mutex<SharedState>,
/// conditional variable used to notify wal senders
cond: Condvar,
@@ -287,11 +348,17 @@ impl Timeline {
fn new(
zttid: ZTenantTimelineId,
callmemaybe_tx: UnboundedSender<CallmeEvent>,
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
shared_state: SharedState,
) -> Timeline {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.inmem.commit_lsn);
Timeline {
zttid,
callmemaybe_tx,
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
mutex: Mutex::new(shared_state),
cond: Condvar::new(),
}
@@ -301,13 +368,21 @@ impl Timeline {
/// not running yet.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_connect(&self, pageserver_connstr: Option<&String>) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes += 1;
// FIXME: currently we always adopt latest pageserver connstr, but we
// should have kind of generations assigned by compute to distinguish
// the latest one or even pass it through consensus to reliably deliver
// to all safekeepers.
shared_state.activate(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?;
let is_wal_backup_action_pending: bool;
{
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes += 1;
is_wal_backup_action_pending = shared_state.update_status();
// FIXME: currently we always adopt latest pageserver connstr, but we
// should have kind of generations assigned by compute to distinguish
// the latest one or even pass it through consensus to reliably deliver
// to all safekeepers.
shared_state.callmemaybe_sub(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?;
}
// Wake up wal backup launcher, if offloading not started yet.
if is_wal_backup_action_pending {
self.wal_backup_launcher_tx.blocking_send(self.zttid)?;
}
Ok(())
}
@@ -315,38 +390,43 @@ impl Timeline {
/// pageserver doesn't need catchup.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_disconnect(&self) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes -= 1;
// If there is no pageserver, can suspend right away; otherwise let
// walsender do that.
if shared_state.num_computes == 0 && shared_state.pageserver_connstr.is_none() {
shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?;
let is_wal_backup_action_pending: bool;
{
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes -= 1;
is_wal_backup_action_pending = shared_state.update_status();
}
// Wake up wal backup launcher, if it is time to stop the offloading.
if is_wal_backup_action_pending {
self.wal_backup_launcher_tx.blocking_send(self.zttid)?;
}
Ok(())
}
/// Deactivate tenant if there is no computes and pageserver is caughtup,
/// assuming the pageserver status is in replica_id.
/// Returns true if deactivated.
pub fn check_deactivate(&self, replica_id: usize) -> Result<bool> {
/// Whether we still need this walsender running?
/// TODO: check this pageserver is actually interested in this timeline.
pub fn stop_walsender(&self, replica_id: usize) -> Result<bool> {
let mut shared_state = self.mutex.lock().unwrap();
if !shared_state.active {
// already suspended
return Ok(true);
}
if shared_state.num_computes == 0 {
let replica_state = shared_state.replicas[replica_id].unwrap();
let deactivate = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet
(replica_state.last_received_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.last_received_lsn >= shared_state.sk.inmem.commit_lsn);
if deactivate {
shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?;
let stop = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
if stop {
shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?;
return Ok(true);
}
}
Ok(false)
}
/// Returns whether s3 offloading is required and sets current status as
/// matching it.
pub fn wal_backup_attend(&self) -> bool {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.wal_backup_attend()
}
/// Deactivates the timeline, assuming it is being deleted.
/// Returns whether the timeline was already active.
///
@@ -354,10 +434,14 @@ impl Timeline {
/// will stop by themselves eventually (possibly with errors, but no panics). There should be no
/// compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
/// we're deleting the timeline anyway.
pub fn deactivate_for_delete(&self) -> Result<bool> {
let mut shared_state = self.mutex.lock().unwrap();
let was_active = shared_state.active;
shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?;
pub async fn deactivate_for_delete(&self) -> Result<bool> {
let was_active: bool;
{
let mut shared_state = self.mutex.lock().unwrap();
was_active = shared_state.active;
shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?;
}
self.wal_backup_launcher_tx.send(self.zttid).await?;
Ok(was_active)
}
@@ -391,6 +475,7 @@ impl Timeline {
}
// Notify caught-up WAL senders about new WAL data received
// TODO: replace-unify it with commit_lsn_watch.
fn notify_wal_senders(&self, shared_state: &mut MutexGuard<SharedState>) {
if shared_state.notified_commit_lsn < shared_state.sk.inmem.commit_lsn {
shared_state.notified_commit_lsn = shared_state.sk.inmem.commit_lsn;
@@ -398,12 +483,17 @@ impl Timeline {
}
}
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
self.commit_lsn_watch_rx.clone()
}
/// Pass arrived message to the safekeeper.
pub fn process_msg(
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
{
let mut shared_state = self.mutex.lock().unwrap();
rmsg = shared_state.sk.process_msg(msg)?;
@@ -419,15 +509,31 @@ impl Timeline {
// Ping wal sender that new data might be available.
self.notify_wal_senders(&mut shared_state);
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
Ok(rmsg)
}
pub fn get_wal_seg_size(&self) -> usize {
self.mutex.lock().unwrap().get_wal_seg_size()
}
pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
let shared_state = self.mutex.lock().unwrap();
(shared_state.sk.inmem.clone(), shared_state.sk.state.clone())
}
pub fn get_wal_backup_lsn(&self) -> Lsn {
self.mutex.lock().unwrap().sk.inmem.backup_lsn
}
pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) {
self.mutex.lock().unwrap().sk.inmem.backup_lsn = backup_lsn;
// we should check whether to shut down offloader, but this will be done
// soon by peer communication anyway.
}
/// Prepare public safekeeper info for reporting.
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result<SkTimelineInfo> {
let shared_state = self.mutex.lock().unwrap();
@@ -436,7 +542,6 @@ impl Timeline {
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(shared_state.sk.inmem.commit_lsn),
s3_wal_lsn: Some(shared_state.sk.inmem.s3_wal_lsn),
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
shared_state.get_replicas_state().remote_consistent_lsn,
@@ -444,14 +549,35 @@ impl Timeline {
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connection_string: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
})
}
/// Update timeline state with peer safekeeper data.
pub fn record_safekeeper_info(&self, sk_info: &SkTimelineInfo, _sk_id: ZNodeId) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.sk.record_safekeeper_info(sk_info)?;
self.notify_wal_senders(&mut shared_state);
pub async fn record_safekeeper_info(
&self,
sk_info: &SkTimelineInfo,
_sk_id: NodeId,
) -> Result<()> {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.mutex.lock().unwrap();
// WAL seg size not initialized yet (no message from compute ever
// received), can't do much without it.
if shared_state.get_wal_seg_size() == 0 {
return Ok(());
}
shared_state.sk.record_safekeeper_info(sk_info)?;
self.notify_wal_senders(&mut shared_state);
is_wal_backup_action_pending = shared_state.update_status();
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
// Wake up wal backup launcher, if it is time to stop the offloading.
if is_wal_backup_action_pending {
self.wal_backup_launcher_tx.send(self.zttid).await?;
}
Ok(())
}
@@ -476,16 +602,16 @@ impl Timeline {
shared_state.sk.wal_store.flush_lsn()
}
pub fn remove_old_wal(&self, s3_offload_enabled: bool) -> Result<()> {
pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
let horizon_segno: XLogSegNo;
let remover: Box<dyn Fn(u64) -> Result<(), anyhow::Error>>;
{
let shared_state = self.mutex.lock().unwrap();
// WAL seg size not initialized yet, no WAL exists.
if shared_state.sk.state.server.wal_seg_size == 0 {
if shared_state.get_wal_seg_size() == 0 {
return Ok(());
}
horizon_segno = shared_state.sk.get_horizon_segno(s3_offload_enabled);
horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
remover = shared_state.sk.wal_store.remove_up_to();
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
return Ok(());
@@ -522,12 +648,14 @@ impl TimelineTools for Option<Arc<Timeline>> {
struct GlobalTimelinesState {
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
callmemaybe_tx: Option<UnboundedSender<CallmeEvent>>,
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
}
lazy_static! {
static ref TIMELINES_STATE: Mutex<GlobalTimelinesState> = Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
callmemaybe_tx: None
callmemaybe_tx: None,
wal_backup_launcher_tx: None,
});
}
@@ -541,17 +669,22 @@ pub struct TimelineDeleteForceResult {
pub struct GlobalTimelines;
impl GlobalTimelines {
pub fn set_callmemaybe_tx(callmemaybe_tx: UnboundedSender<CallmeEvent>) {
pub fn init(
callmemaybe_tx: UnboundedSender<CallmeEvent>,
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
) {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.callmemaybe_tx.is_none());
state.callmemaybe_tx = Some(callmemaybe_tx);
assert!(state.wal_backup_launcher_tx.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
}
fn create_internal(
mut state: MutexGuard<GlobalTimelinesState>,
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
peer_ids: Vec<ZNodeId>,
peer_ids: Vec<NodeId>,
) -> Result<Arc<Timeline>> {
match state.timelines.get(&zttid) {
Some(_) => bail!("timeline {} already exists", zttid),
@@ -559,12 +692,14 @@ impl GlobalTimelines {
// TODO: check directory existence
let dir = conf.timeline_dir(&zttid);
fs::create_dir_all(dir)?;
let shared_state = SharedState::create(conf, &zttid, peer_ids)
.context("failed to create shared state")?;
let new_tli = Arc::new(Timeline::new(
zttid,
state.callmemaybe_tx.as_ref().unwrap().clone(),
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
shared_state,
));
state.timelines.insert(zttid, Arc::clone(&new_tli));
@@ -576,7 +711,7 @@ impl GlobalTimelines {
pub fn create(
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
peer_ids: Vec<ZNodeId>,
peer_ids: Vec<NodeId>,
) -> Result<Arc<Timeline>> {
let state = TIMELINES_STATE.lock().unwrap();
GlobalTimelines::create_internal(state, conf, zttid, peer_ids)
@@ -594,8 +729,7 @@ impl GlobalTimelines {
match state.timelines.get(&zttid) {
Some(result) => Ok(Arc::clone(result)),
None => {
let shared_state =
SharedState::restore(conf, &zttid).context("failed to restore shared state");
let shared_state = SharedState::restore(conf, &zttid);
let shared_state = match shared_state {
Ok(shared_state) => shared_state,
@@ -617,6 +751,7 @@ impl GlobalTimelines {
let new_tli = Arc::new(Timeline::new(
zttid,
state.callmemaybe_tx.as_ref().unwrap().clone(),
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
shared_state,
));
state.timelines.insert(zttid, Arc::clone(&new_tli));
@@ -625,6 +760,12 @@ impl GlobalTimelines {
}
}
/// Get loaded timeline, if it exists.
pub fn get_loaded(zttid: ZTenantTimelineId) -> Option<Arc<Timeline>> {
let state = TIMELINES_STATE.lock().unwrap();
state.timelines.get(&zttid).map(Arc::clone)
}
/// Get ZTenantTimelineIDs of all active timelines.
pub fn get_active_timelines() -> Vec<ZTenantTimelineId> {
let state = TIMELINES_STATE.lock().unwrap();
@@ -665,22 +806,23 @@ impl GlobalTimelines {
/// b) an HTTP GET request about the timeline is made and it's able to restore the current state, or
/// c) an HTTP POST request for timeline creation is made after the timeline is already deleted.
/// TODO: ensure all of the above never happens.
pub fn delete_force(
pub async fn delete_force(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,
) -> Result<TimelineDeleteForceResult> {
info!("deleting timeline {}", zttid);
let was_active = match TIMELINES_STATE.lock().unwrap().timelines.remove(zttid) {
None => false,
Some(tli) => tli.deactivate_for_delete()?,
};
let timeline = TIMELINES_STATE.lock().unwrap().timelines.remove(zttid);
let mut was_active = false;
if let Some(tli) = timeline {
was_active = tli.deactivate_for_delete().await?;
}
GlobalTimelines::delete_force_internal(conf, zttid, was_active)
}
/// Deactivates and deletes all timelines for the tenant, see `delete()`.
/// Returns map of all timelines which the tenant had, `true` if a timeline was active.
/// There may be a race if new timelines are created simultaneously.
pub fn delete_force_all_for_tenant(
pub async fn delete_force_all_for_tenant(
conf: &SafeKeeperConf,
tenant_id: &ZTenantId,
) -> Result<HashMap<ZTenantTimelineId, TimelineDeleteForceResult>> {
@@ -691,14 +833,15 @@ impl GlobalTimelines {
let timelines = &mut TIMELINES_STATE.lock().unwrap().timelines;
for (&zttid, tli) in timelines.iter() {
if zttid.tenant_id == *tenant_id {
to_delete.insert(zttid, tli.deactivate_for_delete()?);
to_delete.insert(zttid, tli.clone());
}
}
// TODO: test that the correct subset of timelines is removed. It's complicated because they are implicitly created currently.
timelines.retain(|zttid, _| !to_delete.contains_key(zttid));
}
let mut deleted = HashMap::new();
for (zttid, was_active) in to_delete {
for (zttid, timeline) in to_delete {
let was_active = timeline.deactivate_for_delete().await?;
deleted.insert(
zttid,
GlobalTimelines::delete_force_internal(conf, &zttid, was_active)?,

View File

@@ -0,0 +1,417 @@
use anyhow::{Context, Result};
use tokio::task::JoinHandle;
use std::cmp::min;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI};
use remote_storage::{GenericRemoteStorage, RemoteStorage};
use tokio::fs::File;
use tokio::runtime::Builder;
use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::watch;
use tokio::time::sleep;
use tracing::*;
use utils::{lsn::Lsn, zid::ZTenantTimelineId};
use crate::broker::{Election, ElectionLeader};
use crate::timeline::{GlobalTimelines, Timeline};
use crate::{broker, SafeKeeperConf};
use once_cell::sync::OnceCell;
const BACKUP_ELECTION_NAME: &str = "WAL_BACKUP";
const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000;
const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
pub fn wal_backup_launcher_thread_main(
conf: SafeKeeperConf,
wal_backup_launcher_rx: Receiver<ZTenantTimelineId>,
) {
let rt = Builder::new_multi_thread()
.worker_threads(conf.backup_runtime_threads)
.enable_all()
.build()
.expect("failed to create wal backup runtime");
rt.block_on(async {
wal_backup_launcher_main_loop(conf, wal_backup_launcher_rx).await;
});
}
/// Check whether wal backup is required for timeline and mark that launcher is
/// aware of current status (if timeline exists).
fn is_wal_backup_required(zttid: ZTenantTimelineId) -> bool {
if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
tli.wal_backup_attend()
} else {
false
}
}
struct WalBackupTaskHandle {
shutdown_tx: Sender<()>,
handle: JoinHandle<()>,
}
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
/// tasks. Having this in separate task simplifies locking, allows to reap
/// panics and separate elections from offloading itself.
async fn wal_backup_launcher_main_loop(
conf: SafeKeeperConf,
mut wal_backup_launcher_rx: Receiver<ZTenantTimelineId>,
) {
info!(
"WAL backup launcher: started, remote config {:?}",
conf.remote_storage
);
let conf_ = conf.clone();
REMOTE_STORAGE.get_or_init(|| {
conf_.remote_storage.as_ref().map(|c| {
GenericRemoteStorage::new(conf_.workdir, c).expect("failed to create remote storage")
})
});
let mut tasks: HashMap<ZTenantTimelineId, WalBackupTaskHandle> = HashMap::new();
loop {
// channel is never expected to get closed
let zttid = wal_backup_launcher_rx.recv().await.unwrap();
let is_wal_backup_required = is_wal_backup_required(zttid);
if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
continue; /* just drain the channel and do nothing */
}
// do we need to do anything at all?
if is_wal_backup_required != tasks.contains_key(&zttid) {
if is_wal_backup_required {
// need to start the task
info!("starting WAL backup task for {}", zttid);
// TODO: decide who should offload in launcher itself by simply checking current state
let election_name = broker::get_campaign_name(
BACKUP_ELECTION_NAME.to_string(),
conf.broker_etcd_prefix.clone(),
&zttid,
);
let my_candidate_name = broker::get_candiate_name(conf.my_id);
let election = broker::Election::new(
election_name,
my_candidate_name,
conf.broker_endpoints.clone(),
);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&zttid);
let handle = tokio::spawn(
backup_task_main(zttid, timeline_dir, shutdown_rx, election)
.instrument(info_span!("WAL backup task", zttid = %zttid)),
);
tasks.insert(
zttid,
WalBackupTaskHandle {
shutdown_tx,
handle,
},
);
} else {
// need to stop the task
info!("stopping WAL backup task for {}", zttid);
let wb_handle = tasks.remove(&zttid).unwrap();
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
// Hm, why I can't await on reference to handle?
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", zttid, e);
}
}
}
}
}
struct WalBackupTask {
timeline: Arc<Timeline>,
timeline_dir: PathBuf,
wal_seg_size: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
leader: Option<ElectionLeader>,
election: Election,
}
/// Offload single timeline.
async fn backup_task_main(
zttid: ZTenantTimelineId,
timeline_dir: PathBuf,
mut shutdown_rx: Receiver<()>,
election: Election,
) {
info!("started");
let timeline: Arc<Timeline> = if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
tli
} else {
/* Timeline could get deleted while task was starting, just exit then. */
info!("no timeline, exiting");
return;
};
let mut wb = WalBackupTask {
wal_seg_size: timeline.get_wal_seg_size(),
commit_lsn_watch_rx: timeline.get_commit_lsn_watch_rx(),
timeline,
timeline_dir,
leader: None,
election,
};
// task is spinned up only when wal_seg_size already initialized
assert!(wb.wal_seg_size > 0);
let mut canceled = false;
select! {
_ = wb.run() => {}
_ = shutdown_rx.recv() => {
canceled = true;
}
}
if let Some(l) = wb.leader {
l.give_up().await;
}
info!("task {}", if canceled { "canceled" } else { "terminated" });
}
impl WalBackupTask {
async fn run(&mut self) {
let mut backup_lsn = Lsn(0);
// election loop
loop {
let mut retry_attempt = 0u32;
if let Some(l) = self.leader.take() {
l.give_up().await;
}
match broker::get_leader(&self.election).await {
Ok(l) => {
self.leader = Some(l);
}
Err(e) => {
error!("error during leader election {:?}", e);
sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await;
continue;
}
}
// offload loop
loop {
if retry_attempt == 0 {
// wait for new WAL to arrive
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
// should never happen, as we hold Arc to timeline.
error!("commit_lsn watch shut down: {:?}", e);
return;
}
} else {
// or just sleep if we errored previously
let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
if let Some(backoff_delay) =
UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
{
retry_delay = min(retry_delay, backoff_delay);
}
sleep(Duration::from_millis(retry_delay)).await;
}
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
// Note that backup_lsn can be higher than commit_lsn if we
// don't have much local WAL and others already uploaded
// segments we don't even have.
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue;
}
if let Some(l) = self.leader.as_mut() {
// Optimization idea for later:
// Avoid checking election leader every time by returning current lease grant expiration time
// Re-check leadership only after expiration time,
// such approach woud reduce overhead on write-intensive workloads
match l
.check_am_i(
self.election.election_name.clone(),
self.election.candidate_name.clone(),
)
.await
{
Ok(leader) => {
if !leader {
info!("leader has changed");
break;
}
}
Err(e) => {
warn!("error validating leader, {:?}", e);
break;
}
}
}
match backup_lsn_range(
backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
)
.await
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
self.timeline.set_wal_backup_lsn(backup_lsn_result);
retry_attempt = 0;
}
Err(e) => {
error!(
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
retry_attempt = min(retry_attempt + 1, u32::MAX);
}
}
}
}
}
}
pub async fn backup_lsn_range(
start_lsn: Lsn,
end_lsn: Lsn,
wal_seg_size: usize,
timeline_dir: &Path,
) -> Result<Lsn> {
let mut res = start_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
for s in &segments {
backup_single_segment(s, timeline_dir)
.await
.with_context(|| format!("offloading segno {}", s.seg_no))?;
res = s.end_lsn;
}
info!(
"offloaded segnos {:?} up to {}, previous backup_lsn {}",
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
end_lsn,
start_lsn,
);
Ok(res)
}
async fn backup_single_segment(seg: &Segment, timeline_dir: &Path) -> Result<()> {
let segment_file_name = seg.file_path(timeline_dir)?;
backup_object(&segment_file_name, seg.size()).await?;
debug!("Backup of {} done", segment_file_name.display());
Ok(())
}
#[derive(Debug, Copy, Clone)]
pub struct Segment {
seg_no: XLogSegNo,
start_lsn: Lsn,
end_lsn: Lsn,
}
impl Segment {
pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
Self {
seg_no,
start_lsn,
end_lsn,
}
}
pub fn object_name(self) -> String {
XLogFileName(PG_TLI, self.seg_no, self.size())
}
pub fn file_path(self, timeline_dir: &Path) -> Result<PathBuf> {
Ok(timeline_dir.join(self.object_name()))
}
pub fn size(self) -> usize {
(u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
}
}
fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
let first_seg = start.segment_number(seg_size);
let last_seg = end.segment_number(seg_size);
let res: Vec<Segment> = (first_seg..last_seg)
.map(|s| {
let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
})
.collect();
res
}
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
let file = File::open(&source_file).await?;
// Storage is initialized by launcher at ths point.
match storage.as_ref().unwrap() {
GenericRemoteStorage::Local(local_storage) => {
let destination = local_storage.remote_object_id(source_file)?;
debug!(
"local upload about to start from {} to {}",
source_file.display(),
destination.display()
);
local_storage.upload(file, size, &destination, None).await
}
GenericRemoteStorage::S3(s3_storage) => {
let s3key = s3_storage.remote_object_id(source_file)?;
debug!(
"S3 upload about to start from {} to {:?}",
source_file.display(),
s3key
);
s3_storage.upload(file, size, &s3key, None).await
}
}?;
Ok(())
}

View File

@@ -0,0 +1,20 @@
import pytest
from contextlib import closing
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
#
# Test error handling, if the 'basebackup' command fails in the middle
# of building the tar archive.
#
def test_basebackup_error(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli.create_branch("test_basebackup_error", "empty")
# Introduce failpoint
env.pageserver.safe_psql(f"failpoints basebackup-before-control-file=return")
with pytest.raises(Exception, match="basebackup-before-control-file"):
pg = env.postgres.create_start('test_basebackup_error')

View File

@@ -12,7 +12,7 @@ from contextlib import closing
from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.zenith_fixtures import PgBin, Etcd, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.zenith_fixtures import PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.utils import get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
@@ -401,7 +401,7 @@ def test_wal_removal(zenith_env_builder: ZenithEnvBuilder):
http_cli = env.safekeepers[0].http_client()
# Pretend WAL is offloaded to s3.
http_cli.record_safekeeper_info(tenant_id, timeline_id, {'s3_wal_lsn': 'FFFFFFFF/FEFFFFFF'})
http_cli.record_safekeeper_info(tenant_id, timeline_id, {'backup_lsn': 'FFFFFFFF/FEFFFFFF'})
# wait till first segment is removed on all safekeepers
started_at = time.time()
@@ -414,6 +414,56 @@ def test_wal_removal(zenith_env_builder: ZenithEnvBuilder):
time.sleep(0.5)
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
def test_wal_backup(zenith_env_builder: ZenithEnvBuilder, storage_type: str):
zenith_env_builder.num_safekeepers = 3
if storage_type == 'local_fs':
zenith_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
zenith_env_builder.enable_s3_mock_remote_storage('test_safekeepers_wal_backup')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
zenith_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_safekeepers_wal_backup')
pg = env.postgres.create_start('test_safekeepers_wal_backup')
# learn zenith timeline from compute
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur.execute('create table t(key int, value text)')
# Shut down subsequently each of safekeepers and fill a segment while sk is
# down; ensure segment gets offloaded by others.
offloaded_seg_end = ['0/2000000', '0/3000000', '0/4000000']
for victim, seg_end in zip(env.safekeepers, offloaded_seg_end):
victim.stop()
# roughly fills one segment
cur.execute("insert into t select generate_series(1,250000), 'payload'")
live_sk = [sk for sk in env.safekeepers if sk != victim][0]
http_cli = live_sk.http_client()
started_at = time.time()
while True:
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"live sk status is {tli_status}")
if lsn_from_hex(tli_status.backup_lsn) >= lsn_from_hex(seg_end):
break
elapsed = time.time() - started_at
if elapsed > 20:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s segment ending at {seg_end} get offloaded")
time.sleep(0.5)
victim.start()
class ProposerPostgres(PgProtocol):
"""Object for running postgres without ZenithEnv"""
def __init__(self,

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from dataclasses import field
from enum import Flag, auto
import textwrap
from cached_property import cached_property
import asyncpg
@@ -421,10 +422,51 @@ class MockS3Server:
def secret_key(self) -> str:
return 'test'
def access_env_vars(self) -> Dict[Any, Any]:
return {
'AWS_ACCESS_KEY_ID': self.access_key(),
'AWS_SECRET_ACCESS_KEY': self.secret_key(),
}
def kill(self):
self.subprocess.kill()
@dataclass
class LocalFsStorage:
local_path: Path
@dataclass
class S3Storage:
bucket_name: str
bucket_region: str
endpoint: Optional[str]
RemoteStorage = Union[LocalFsStorage, S3Storage]
# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage):
if isinstance(remote_storage, LocalFsStorage):
res = f"local_path='{remote_storage.local_path}'"
elif isinstance(remote_storage, S3Storage):
res = f"bucket_name='{remote_storage.bucket_name}', bucket_region='{remote_storage.bucket_region}'"
if remote_storage.endpoint is not None:
res += f", endpoint='{remote_storage.endpoint}'"
else:
raise Exception(f'Unknown storage configuration {remote_storage}')
else:
raise Exception("invalid remote storage type")
return f"{{{res}}}"
class RemoteStorageUsers(Flag):
PAGESERVER = auto()
SAFEKEEPER = auto()
class ZenithEnvBuilder:
"""
Builder object to create a Zenith runtime environment
@@ -440,6 +482,7 @@ class ZenithEnvBuilder:
broker: Etcd,
mock_s3_server: MockS3Server,
remote_storage: Optional[RemoteStorage] = None,
remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 1,
pageserver_auth_enabled: bool = False,
@@ -449,6 +492,7 @@ class ZenithEnvBuilder:
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
self.remote_storage = remote_storage
self.remote_storage_users = remote_storage_users
self.broker = broker
self.mock_s3_server = mock_s3_server
self.pageserver_config_override = pageserver_config_override
@@ -497,9 +541,9 @@ class ZenithEnvBuilder:
aws_access_key_id=self.mock_s3_server.access_key(),
aws_secret_access_key=self.mock_s3_server.secret_key(),
).create_bucket(Bucket=bucket_name)
self.remote_storage = S3Storage(bucket=bucket_name,
self.remote_storage = S3Storage(bucket_name=bucket_name,
endpoint=mock_endpoint,
region=mock_region)
bucket_region=mock_region)
def __enter__(self):
return self
@@ -557,6 +601,7 @@ class ZenithEnv:
self.safekeepers: List[Safekeeper] = []
self.broker = config.broker
self.remote_storage = config.remote_storage
self.remote_storage_users = config.remote_storage_users
# generate initial tenant ID here instead of letting 'zenith init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -605,8 +650,12 @@ class ZenithEnv:
id = {id}
pg_port = {port.pg}
http_port = {port.http}
sync = false # Disable fsyncs to make the tests go faster
""")
sync = false # Disable fsyncs to make the tests go faster""")
if bool(self.remote_storage_users
& RemoteStorageUsers.SAFEKEEPER) and self.remote_storage is not None:
toml += textwrap.dedent(f"""
remote_storage = "{remote_storage_to_toml_inline_table(self.remote_storage)}"
""")
safekeeper = Safekeeper(env=self, id=id, port=port)
self.safekeepers.append(safekeeper)
@@ -638,7 +687,7 @@ def _shared_simple_env(request: Any,
mock_s3_server: MockS3Server,
default_broker: Etcd) -> Iterator[ZenithEnv]:
"""
Internal fixture backing the `zenith_simple_env` fixture. If TEST_SHARED_FIXTURES
# Internal fixture backing the `zenith_simple_env` fixture. If TEST_SHARED_FIXTURES
is set, this is shared by all tests using `zenith_simple_env`.
"""
@@ -822,20 +871,6 @@ class PageserverPort:
http: int
@dataclass
class LocalFsStorage:
root: Path
@dataclass
class S3Storage:
bucket: str
region: str
endpoint: Optional[str]
RemoteStorage = Union[LocalFsStorage, S3Storage]
CREATE_TIMELINE_ID_EXTRACTOR = re.compile(r"^Created timeline '(?P<timeline_id>[^']+)'",
re.MULTILINE)
CREATE_TIMELINE_ID_EXTRACTOR = re.compile(r"^Created timeline '(?P<timeline_id>[^']+)'",
@@ -998,6 +1033,7 @@ class ZenithCli:
append_pageserver_param_overrides(
params_to_update=cmd,
remote_storage=self.env.remote_storage,
remote_storage_users=self.env.remote_storage_users,
pageserver_config_override=self.env.pageserver.config_override)
res = self.raw_cli(cmd)
@@ -1022,14 +1058,10 @@ class ZenithCli:
append_pageserver_param_overrides(
params_to_update=start_args,
remote_storage=self.env.remote_storage,
remote_storage_users=self.env.remote_storage_users,
pageserver_config_override=self.env.pageserver.config_override)
s3_env_vars = None
if self.env.s3_mock_server:
s3_env_vars = {
'AWS_ACCESS_KEY_ID': self.env.s3_mock_server.access_key(),
'AWS_SECRET_ACCESS_KEY': self.env.s3_mock_server.secret_key(),
}
s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None
return self.raw_cli(start_args, extra_env_vars=s3_env_vars)
def pageserver_stop(self, immediate=False) -> 'subprocess.CompletedProcess[str]':
@@ -1041,7 +1073,8 @@ class ZenithCli:
return self.raw_cli(cmd)
def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]':
return self.raw_cli(['safekeeper', 'start', str(id)])
s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None
return self.raw_cli(['safekeeper', 'start', str(id)], extra_env_vars=s3_env_vars)
def safekeeper_stop(self,
id: Optional[int] = None,
@@ -1237,22 +1270,13 @@ class ZenithPageserver(PgProtocol):
def append_pageserver_param_overrides(
params_to_update: List[str],
remote_storage: Optional[RemoteStorage],
remote_storage_users: RemoteStorageUsers,
pageserver_config_override: Optional[str] = None,
):
if remote_storage is not None:
if isinstance(remote_storage, LocalFsStorage):
pageserver_storage_override = f"local_path='{remote_storage.root}'"
elif isinstance(remote_storage, S3Storage):
pageserver_storage_override = f"bucket_name='{remote_storage.bucket}',\
bucket_region='{remote_storage.region}'"
if remote_storage.endpoint is not None:
pageserver_storage_override += f",endpoint='{remote_storage.endpoint}'"
else:
raise Exception(f'Unknown storage configuration {remote_storage}')
if bool(remote_storage_users & RemoteStorageUsers.PAGESERVER) and remote_storage is not None:
remote_storage_toml_table = remote_storage_to_toml_inline_table(remote_storage)
params_to_update.append(
f'--pageserver-config-override=remote_storage={{{pageserver_storage_override}}}')
f'--pageserver-config-override=remote_storage={remote_storage_toml_table}')
env_overrides = os.getenv('ZENITH_PAGESERVER_OVERRIDES')
if env_overrides is not None:
@@ -1786,8 +1810,9 @@ class Safekeeper:
class SafekeeperTimelineStatus:
acceptor_epoch: int
flush_lsn: str
remote_consistent_lsn: str
timeline_start_lsn: str
backup_lsn: str
remote_consistent_lsn: str
@dataclass
@@ -1812,8 +1837,9 @@ class SafekeeperHttpClient(requests.Session):
resj = res.json()
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],
flush_lsn=resj['flush_lsn'],
remote_consistent_lsn=resj['remote_consistent_lsn'],
timeline_start_lsn=resj['timeline_start_lsn'])
timeline_start_lsn=resj['timeline_start_lsn'],
backup_lsn=resj['backup_lsn'],
remote_consistent_lsn=resj['remote_consistent_lsn'])
def record_safekeeper_info(self, tenant_id: str, timeline_id: str, body):
res = self.post(
@@ -1893,7 +1919,11 @@ class Etcd:
f"--data-dir={self.datadir}",
f"--listen-client-urls={client_url}",
f"--advertise-client-urls={client_url}",
f"--listen-peer-urls=http://127.0.0.1:{self.peer_port}"
f"--listen-peer-urls=http://127.0.0.1:{self.peer_port}",
# Set --quota-backend-bytes to keep the etcd virtual memory
# size smaller. Our test etcd clusters are very small.
# See https://github.com/etcd-io/etcd/issues/7910
f"--quota-backend-bytes=100000000"
]
self.handle = subprocess.Popen(args, stdout=log_file, stderr=log_file)