Merge with main

This commit is contained in:
Konstantin Knizhnik
2022-05-13 18:57:32 +03:00
53 changed files with 724 additions and 256 deletions

View File

@@ -6,6 +6,7 @@ zenith-us-stage-ps-2 console_region_id=27
zenith-us-stage-sk-1 console_region_id=27
zenith-us-stage-sk-2 console_region_id=27
zenith-us-stage-sk-4 console_region_id=27
zenith-us-stage-sk-5 console_region_id=27
[storage:children]
pageservers

4
Cargo.lock generated
View File

@@ -1594,6 +1594,7 @@ dependencies = [
"clap 3.0.14",
"comfy-table",
"control_plane",
"git-version",
"pageserver",
"postgres",
"postgres_ffi",
@@ -1785,6 +1786,7 @@ dependencies = [
"daemonize",
"fail",
"futures",
"git-version",
"hex",
"hex-literal",
"humantime",
@@ -2177,6 +2179,7 @@ dependencies = [
"bytes",
"clap 3.0.14",
"futures",
"git-version",
"hashbrown",
"hex",
"hmac 0.12.1",
@@ -2629,6 +2632,7 @@ dependencies = [
"daemonize",
"etcd_broker",
"fs2",
"git-version",
"hex",
"humantime",
"hyper",

View File

@@ -136,13 +136,20 @@ pub fn handle_roles(spec: &ClusterSpec, client: &mut Client) -> Result<()> {
xact.execute(query.as_str(), &[])?;
}
} else {
info!("role name {}", &name);
info!("role name: '{}'", &name);
let mut query: String = format!("CREATE ROLE {} ", name.quote());
info!("role create query {}", &query);
info!("role create query: '{}'", &query);
info_print!(" -> create");
query.push_str(&role.to_pg_options());
xact.execute(query.as_str(), &[])?;
let grant_query = format!(
"grant pg_read_all_data, pg_write_all_data to {}",
name.quote()
);
xact.execute(grant_query.as_str(), &[])?;
info!("role grant query: '{}'", &grant_query);
}
info_print!("\n");

View File

@@ -51,7 +51,7 @@ pub struct SkTimelineInfo {
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
#[serde(default)]
pub wal_stream_connection_string: Option<String>,
pub safekeeper_connection_string: Option<String>,
}
#[derive(Debug, thiserror::Error)]
@@ -217,16 +217,22 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
break;
}
let mut timeline_updates: HashMap<ZTenantTimelineId, HashMap<ZNodeId, SkTimelineInfo>> =
HashMap::new();
let mut timeline_updates: HashMap<ZTenantTimelineId, HashMap<ZNodeId, SkTimelineInfo>> = HashMap::new();
// Keep track that the timeline data updates from etcd arrive in the right order.
// https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas
// > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering.
let mut timeline_etcd_versions: HashMap<ZTenantTimelineId, i64> = HashMap::new();
let events = resp.events();
debug!("Processing {} events", events.len());
for event in events {
if EventType::Put == event.event_type() {
if let Some(kv) = event.kv() {
match parse_etcd_key_value(subscription_kind, &regex, kv) {
if let Some(new_etcd_kv) = event.kv() {
let new_kv_version = new_etcd_kv.version();
match parse_etcd_key_value(subscription_kind, &regex, new_etcd_kv) {
Ok(Some((zttid, timeline))) => {
match timeline_updates
.entry(zttid)
@@ -234,12 +240,15 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
.entry(timeline.safekeeper_id)
{
hash_map::Entry::Occupied(mut o) => {
if o.get().flush_lsn < timeline.info.flush_lsn {
let old_etcd_kv_version = timeline_etcd_versions.get(&zttid).copied().unwrap_or(i64::MIN);
if old_etcd_kv_version < new_kv_version {
o.insert(timeline.info);
timeline_etcd_versions.insert(zttid,new_kv_version);
}
}
hash_map::Entry::Vacant(v) => {
v.insert(timeline.info);
timeline_etcd_versions.insert(zttid,new_kv_version);
}
}
}

View File

@@ -8,6 +8,7 @@
#![allow(deref_nullptr)]
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
@@ -37,3 +38,21 @@ pub const fn transaction_id_precedes(id1: TransactionId, id2: TransactionId) ->
let diff = id1.wrapping_sub(id2) as i32;
diff < 0
}
// Check if page is not yet initialized (port of Postgres PageIsInit() macro)
pub fn page_is_new(pg: &[u8]) -> bool {
pg[14] == 0 && pg[15] == 0 // pg_upper == 0
}
// ExtractLSN from page header
pub fn page_get_lsn(pg: &[u8]) -> Lsn {
Lsn(
((u32::from_le_bytes(pg[0..4].try_into().unwrap()) as u64) << 32)
| u32::from_le_bytes(pg[4..8].try_into().unwrap()) as u64,
)
}
pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
}

View File

@@ -1,3 +0,0 @@
fn main() {
println!("cargo:rerun-if-env-changed=GIT_VERSION");
}

View File

@@ -1,7 +1,7 @@
use std::str::FromStr;
use super::error::ApiError;
use hyper::{Body, Request};
use hyper::{body::HttpBody, Body, Request};
use routerify::ext::RequestExt;
pub fn get_request_param<'a>(
@@ -31,3 +31,10 @@ pub fn parse_request_param<T: FromStr>(
))),
}
}
pub async fn ensure_no_body(request: &mut Request<Body>) -> Result<(), ApiError> {
match request.body_mut().data().await {
Some(_) => Err(ApiError::BadRequest("Unexpected request body".into())),
None => Ok(()),
}
}

View File

@@ -54,31 +54,44 @@ pub mod nonblock;
// Default signal handling
pub mod signals;
// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
//
// we have several cases:
// * building locally from git repo
// * building in CI from git repo
// * building in docker (either in CI or locally)
//
// One thing to note is that .git is not available in docker (and it is bad to include it there).
// So everything becides docker build is covered by git_version crate.
// For docker use environment variable to pass git version, which is then retrieved by buildscript (build.rs).
// It takes variable from build process env and puts it to the rustc env. And then we can retrieve it here by using env! macro.
// Git version received from environment variable used as a fallback in git_version invokation.
// And to avoid running buildscript every recompilation, we use rerun-if-env-changed option.
// So the build script will be run only when GIT_VERSION envvar has changed.
//
// Why not to use buildscript to get git commit sha directly without procmacro from different crate?
// Caching and workspaces complicates that. In case `utils` is not
// recompiled due to caching then version may become outdated.
// git_version crate handles that case by introducing a dependency on .git internals via include_bytes! macro,
// so if we changed the index state git_version will pick that up and rerun the macro.
//
// Note that with git_version prefix is `git:` and in case of git version from env its `git-env:`.
use git_version::git_version;
pub const GIT_VERSION: &str = git_version!(
prefix = "git:",
fallback = concat!("git-env:", env!("GIT_VERSION")),
args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha
);
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:
/// * building locally from git repo
/// * building in CI from git repo
/// * building in docker (either in CI or locally)
///
/// One thing to note is that .git is not available in docker (and it is bad to include it there).
/// So everything becides docker build is covered by git_version crate, and docker uses a `GIT_VERSION` argument to get the value required.
/// It takes variable from build process env and puts it to the rustc env. And then we can retrieve it here by using env! macro.
/// Git version received from environment variable used as a fallback in git_version invokation.
/// And to avoid running buildscript every recompilation, we use rerun-if-env-changed option.
/// So the build script will be run only when GIT_VERSION envvar has changed.
///
/// Why not to use buildscript to get git commit sha directly without procmacro from different crate?
/// Caching and workspaces complicates that. In case `utils` is not
/// recompiled due to caching then version may become outdated.
/// git_version crate handles that case by introducing a dependency on .git internals via include_bytes! macro,
/// so if we changed the index state git_version will pick that up and rerun the macro.
///
/// Note that with git_version prefix is `git:` and in case of git version from env its `git-env:`.
///
/// #############################################################################################
/// TODO this macro is not the way the library is intended to be used, see https://github.com/neondatabase/neon/issues/1565 for details.
/// We use `cachepot` to reduce our current CI build times: https://github.com/neondatabase/cloud/pull/1033#issuecomment-1100935036
/// Yet, it seems to ignore the GIT_VERSION env variable, passed to Docker build, even with build.rs that contains
/// `println!("cargo:rerun-if-env-changed=GIT_VERSION");` code for cachepot cache invalidation.
/// The problem needs further investigation and regular `const` declaration instead of a macro.
#[macro_export]
macro_rules! project_git_version {
($const_identifier:ident) => {
const $const_identifier: &str = git_version::git_version!(
prefix = "git:",
fallback = concat!(
"git-env:",
env!("GIT_VERSION", "Missing GIT_VERSION envvar")
),
args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha
);
};
}

View File

@@ -9,6 +9,7 @@ anyhow = "1.0"
serde_json = "1"
comfy-table = "5.0.1"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
git-version = "0.3.5"
# FIXME: 'pageserver' is needed for BranchInfo. Refactor
pageserver = { path = "../pageserver" }

View File

@@ -20,8 +20,8 @@ use utils::{
auth::{Claims, Scope},
lsn::Lsn,
postgres_backend::AuthType,
project_git_version,
zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
GIT_VERSION,
};
use pageserver::timelines::TimelineInfo;
@@ -30,6 +30,7 @@ use pageserver::timelines::TimelineInfo;
const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1);
const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
fn default_conf() -> String {
format!(
@@ -540,6 +541,29 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
"tenant {} successfully created on the pageserver",
new_tenant_id
);
// Create an initial timeline for the new tenant
let new_timeline_id = parse_timeline_id(create_match)?;
let timeline = pageserver
.timeline_create(new_tenant_id, new_timeline_id, None, None)?
.context(format!(
"Failed to create initial timeline for tenant {new_tenant_id}"
))?;
let new_timeline_id = timeline.timeline_id;
let last_record_lsn = timeline
.local
.context(format!("Failed to get last record LSN: no local timeline info for timeline {new_timeline_id}"))?
.last_record_lsn;
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_string(),
new_tenant_id,
new_timeline_id,
)?;
println!(
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}",
);
}
Some(("config", create_match)) => {
let tenant_id = get_tenant_id(create_match, env)?;

View File

@@ -52,6 +52,7 @@ nix = "0.23"
once_cell = "1.8.0"
crossbeam-utils = "0.8.5"
fail = "0.5.0"
git-version = "0.3.5"
# 'experimental' is needed for the `zstd::bulk::Decompressor::upper_bound` function.
zstd = { version = "0.11.1", features = ["experimental"] }

View File

@@ -7,7 +7,9 @@ use pageserver::layered_repository::dump_layerfile_from_path;
use pageserver::page_cache;
use pageserver::virtual_file;
use std::path::PathBuf;
use utils::GIT_VERSION;
use utils::project_git_version;
project_git_version!(GIT_VERSION);
fn main() -> Result<()> {
let arg_matches = App::new("Zenith dump_layerfile utility")

View File

@@ -20,17 +20,18 @@ use utils::{
http::endpoint,
logging,
postgres_backend::AuthType,
project_git_version,
shutdown::exit_now,
signals::{self, Signal},
tcp_listener,
zid::{ZTenantId, ZTimelineId},
GIT_VERSION,
};
project_git_version!(GIT_VERSION);
fn version() -> String {
format!(
"{} profiling:{} failpoints:{}",
GIT_VERSION,
"{GIT_VERSION} profiling:{} failpoints:{}",
cfg!(feature = "profiling"),
fail::has_failpoints()
)
@@ -217,7 +218,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
// Initialize logger
let log_file = logging::init(LOG_FILE_NAME, daemonize)?;
info!("version: {}", GIT_VERSION);
info!("version: {GIT_VERSION}");
// TODO: Check that it looks like a valid repository before going further

View File

@@ -6,7 +6,9 @@ use clap::{App, Arg};
use pageserver::layered_repository::metadata::TimelineMetadata;
use std::path::PathBuf;
use std::str::FromStr;
use utils::{lsn::Lsn, GIT_VERSION};
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
fn main() -> Result<()> {
let arg_matches = App::new("Zenith update metadata utility")

View File

@@ -24,7 +24,7 @@ use crate::tenant_config::{TenantConf, TenantConfOpt};
pub const ZSTD_MAX_SAMPLES: usize = 1024;
pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd
pub const ZSTD_MAX_SAMPLE_BYTES: usize = 10 * 1024 * 1024; // max memory size for holding samples
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 8 * 1024;
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 8 * 1024 - 4; // make dictionary + BLOB length fit in first page
pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level
pub mod defaults {

View File

@@ -74,6 +74,7 @@ pub mod metadata;
mod par_fsync;
mod storage_layer;
use crate::pgdatadir_mapping::LsnForTimestamp;
use delta_layer::{DeltaLayer, DeltaLayerWriter};
use ephemeral_file::is_ephemeral_file;
use filename::{DeltaFileName, ImageFileName};
@@ -81,6 +82,7 @@ use image_layer::{ImageLayer, ImageLayerWriter};
use inmemory_layer::InMemoryLayer;
use layer_map::LayerMap;
use layer_map::SearchResult;
use postgres_ffi::xlog_utils::to_pg_timestamp;
use storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
// re-export this function so that page_cache.rs can use it.
@@ -89,7 +91,7 @@ pub use crate::layered_repository::ephemeral_file::writeback as writeback_epheme
// Metrics collected on operations on the storage repository.
lazy_static! {
static ref STORAGE_TIME: HistogramVec = register_histogram_vec!(
"pageserver_storage_time",
"pageserver_storage_operations_seconds",
"Time spent on storage operations",
&["operation", "tenant_id", "timeline_id"]
)
@@ -99,8 +101,8 @@ lazy_static! {
// Metrics collected on operations on the storage repository.
lazy_static! {
static ref RECONSTRUCT_TIME: HistogramVec = register_histogram_vec!(
"pageserver_getpage_reconstruct_time",
"Time spent on storage operations",
"pageserver_getpage_reconstruct_seconds",
"Time spent in reconstruct_value",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric");
@@ -108,13 +110,13 @@ lazy_static! {
lazy_static! {
static ref MATERIALIZED_PAGE_CACHE_HIT: IntCounterVec = register_int_counter_vec!(
"materialize_page_cache_hits",
"pageserver_materialized_cache_hits_total",
"Number of cache hits from materialized page cache",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric");
static ref WAIT_LSN_TIME: HistogramVec = register_histogram_vec!(
"wait_lsn_time",
"pageserver_wait_lsn_seconds",
"Time spent waiting for WAL to arrive",
&["tenant_id", "timeline_id"]
)
@@ -134,12 +136,12 @@ lazy_static! {
// or in testing they estimate how much we would upload if we did.
lazy_static! {
static ref NUM_PERSISTENT_FILES_CREATED: IntCounter = register_int_counter!(
"pageserver_num_persistent_files_created",
"pageserver_created_persistent_files_total",
"Number of files created that are meant to be uploaded to cloud storage",
)
.expect("failed to define a metric");
static ref PERSISTENT_BYTES_WRITTEN: IntCounter = register_int_counter!(
"pageserver_persistent_bytes_written",
"pageserver_written_persistent_bytes_total",
"Total bytes written that are meant to be uploaded to cloud storage",
)
.expect("failed to define a metric");
@@ -1512,7 +1514,7 @@ impl LayeredTimeline {
.ensure_loaded()
.with_context(|| {
format!(
"Ancestor timeline is not is not loaded. Timeline id: {} Ancestor id {:?}",
"Ancestor timeline is not loaded. Timeline id: {} Ancestor id {:?}",
self.timeline_id,
self.get_ancestor_timeline_id(),
)
@@ -2118,11 +2120,54 @@ impl LayeredTimeline {
let cutoff = gc_info.cutoff;
let pitr = gc_info.pitr;
// Calculate pitr cutoff point.
// If we cannot determine a cutoff LSN, be conservative and don't GC anything.
let mut pitr_cutoff_lsn: Lsn = *self.get_latest_gc_cutoff_lsn();
if let Ok(timeline) =
tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)
{
// First, calculate pitr_cutoff_timestamp and then convert it to LSN.
// If we don't have enough data to convert to LSN,
// play safe and don't remove any layers.
if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
match timeline.find_lsn_for_timestamp(pitr_timestamp)? {
LsnForTimestamp::Present(lsn) => pitr_cutoff_lsn = lsn,
LsnForTimestamp::Future(lsn) => {
debug!("future({})", lsn);
pitr_cutoff_lsn = cutoff;
}
LsnForTimestamp::Past(lsn) => {
debug!("past({})", lsn);
}
}
debug!("pitr_cutoff_lsn = {:?}", pitr_cutoff_lsn)
}
} else if cfg!(test) {
// We don't have local timeline in mocked cargo tests.
// So, just ignore pitr_interval setting in this case.
pitr_cutoff_lsn = cutoff;
}
let new_gc_cutoff = Lsn::min(cutoff, pitr_cutoff_lsn);
// Nothing to GC. Return early.
if *self.get_latest_gc_cutoff_lsn() >= new_gc_cutoff {
info!(
"Nothing to GC for timeline {}. cutoff_lsn {}",
self.timeline_id, new_gc_cutoff
);
result.elapsed = now.elapsed()?;
return Ok(result);
}
let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %cutoff).entered();
// We need to ensure that no one branches at a point before latest_gc_cutoff_lsn.
// See branch_timeline() for details.
*self.latest_gc_cutoff_lsn.write().unwrap() = cutoff;
*self.latest_gc_cutoff_lsn.write().unwrap() = new_gc_cutoff;
info!("GC starting");
@@ -2162,30 +2207,18 @@ impl LayeredTimeline {
result.layers_needed_by_cutoff += 1;
continue 'outer;
}
// 2. It is newer than PiTR interval?
// We use modification time of layer file to estimate update time.
// This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill.
// It is not expected that users will need high precision here. And this estimation
// is conservative: modification time of file is always newer than actual time of version
// creation. So it is safe for users.
// TODO A possible "bloat" issue still persists here.
// If modification time changes because of layer upload/download, we will keep these files
// longer than necessary.
// https://github.com/neondatabase/neon/issues/1554
//
if let Ok(metadata) = fs::metadata(&l.filename()) {
let last_modified = metadata.modified()?;
if now.duration_since(last_modified)? < pitr {
debug!(
"keeping {} because it's modification time {:?} is newer than PITR {:?}",
l.filename().display(),
last_modified,
pitr
);
result.layers_needed_by_pitr += 1;
continue 'outer;
}
// 2. It is newer than PiTR cutoff point?
if l.get_lsn_range().end > pitr_cutoff_lsn {
debug!(
"keeping {} because it's newer than pitr_cutoff_lsn {}",
l.filename().display(),
pitr_cutoff_lsn
);
result.layers_needed_by_pitr += 1;
continue 'outer;
}
// 3. Is it needed by a child branch?
// NOTE With that wee would keep data that
// might be referenced by child branches forever.

View File

@@ -318,6 +318,9 @@ impl Layer for DeltaLayer {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
blob_refs.push((entry_lsn, blob_ref));
!blob_ref.will_init()
@@ -532,7 +535,7 @@ impl DeltaLayer {
/// Open the underlying file and read the metadata into memory, if it's
/// not loaded already.
///
fn load(&self) -> Result<RwLockReadGuard<DeltaLayerInner>> {
fn load(&self) -> Result<RwLockReadGuard<DeltaLayerInner>>{
loop {
// Quick exit if already loaded
let inner = self.inner.read().unwrap();

View File

@@ -45,7 +45,7 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
lazy_static! {
static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!(
"pageserver_live_connections_count",
"pageserver_live_connections",
"Number of live network connections",
&["pageserver_connection_kind"]
)

View File

@@ -19,7 +19,6 @@ use std::net::TcpListener;
use std::str;
use std::str::FromStr;
use std::sync::{Arc, RwLockReadGuard};
use std::time::Duration;
use tracing::*;
use utils::{
auth::{self, Claims, JwtAuth, Scope},
@@ -326,7 +325,7 @@ const TIME_BUCKETS: &[f64] = &[
lazy_static! {
static ref SMGR_QUERY_TIME: HistogramVec = register_histogram_vec!(
"pageserver_smgr_query_time",
"pageserver_smgr_query_seconds",
"Time spent on smgr query handling",
&["smgr_query_type", "tenant_id", "timeline_id"],
TIME_BUCKETS.into()
@@ -796,7 +795,9 @@ impl postgres_backend::Handler for PageServerHandler {
.unwrap_or_else(|| Ok(repo.get_gc_horizon()))?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, Duration::ZERO, true)?;
// Use tenant's pitr setting
let pitr = repo.get_pitr_interval();
let result = repo.gc_iteration(Some(timelineid), gc_horizon, pitr, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layers_total"),
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),

View File

@@ -208,12 +208,12 @@ lazy_static! {
)
.expect("failed to register pageserver remote storage remaining sync items int gauge");
static ref FATAL_TASK_FAILURES: IntCounter = register_int_counter!(
"pageserver_remote_storage_fatal_task_failures",
"pageserver_remote_storage_fatal_task_failures_total",
"Number of critically failed tasks"
)
.expect("failed to register pageserver remote storage remaining sync items int gauge");
static ref IMAGE_SYNC_TIME: HistogramVec = register_histogram_vec!(
"pageserver_remote_storage_image_sync_time",
"pageserver_remote_storage_image_sync_seconds",
"Time took to synchronize (download or upload) a whole pageserver image. \
Grouped by `operation_kind` (upload|download) and `status` (success|failure)",
&["operation_kind", "status"],

View File

@@ -34,7 +34,7 @@ const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
lazy_static! {
static ref STORAGE_IO_TIME: HistogramVec = register_histogram_vec!(
"pageserver_io_time",
"pageserver_io_operations_seconds",
"Time spent in IO operations",
&["operation", "tenant_id", "timeline_id"],
STORAGE_IO_TIME_BUCKETS.into()
@@ -43,8 +43,8 @@ lazy_static! {
}
lazy_static! {
static ref STORAGE_IO_SIZE: IntGaugeVec = register_int_gauge_vec!(
"pageserver_io_size",
"Amount of bytes",
"pageserver_io_operations_bytes_total",
"Total amount of bytes read/written in IO operations",
&["operation", "tenant_id", "timeline_id"]
)
.expect("failed to define a metric");

View File

@@ -24,6 +24,7 @@
use anyhow::Context;
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{page_is_new, page_set_lsn};
use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
@@ -304,8 +305,14 @@ impl<'a, R: Repository> WalIngest<'a, R> {
image.resize(image.len() + blk.hole_length as usize, 0u8);
image.unsplit(tail);
}
image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
//
// Match the logic of XLogReadBufferForRedoExtended:
// The page may be uninitialized. If so, we can't set the LSN because
// that would corrupt the page.
//
if !page_is_new(&image) {
page_set_lsn(&mut image, lsn)
}
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?;
} else {

View File

@@ -106,16 +106,16 @@ impl crate::walredo::WalRedoManager for DummyRedoManager {
// each tenant.
lazy_static! {
static ref WAL_REDO_TIME: Histogram =
register_histogram!("pageserver_wal_redo_time", "Time spent on WAL redo")
register_histogram!("pageserver_wal_redo_seconds", "Time spent on WAL redo")
.expect("failed to define a metric");
static ref WAL_REDO_WAIT_TIME: Histogram = register_histogram!(
"pageserver_wal_redo_wait_time",
"pageserver_wal_redo_wait_seconds",
"Time spent waiting for access to the WAL redo process"
)
.expect("failed to define a metric");
static ref WAL_REDO_RECORD_COUNTER: IntCounter = register_int_counter!(
"pageserver_wal_records_replayed",
"Number of WAL records replayed"
"pageserver_replayed_wal_records_total",
"Number of WAL records replayed in WAL redo process"
)
.unwrap();
}

View File

@@ -33,6 +33,7 @@ tokio = { version = "1.17", features = ["macros"] }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-rustls = "0.23.0"
url = "2.2.2"
git-version = "0.3.5"
utils = { path = "../libs/utils" }
metrics = { path = "../libs/metrics" }

View File

@@ -117,7 +117,7 @@ async fn get_auth_info(
let mut url = reqwest::Url::parse(&format!("{auth_endpoint}/proxy_get_role_secret"))?;
url.query_pairs_mut()
.append_pair("cluster", cluster)
.append_pair("project", cluster)
.append_pair("role", user);
// TODO: use a proper logger
@@ -141,7 +141,7 @@ async fn wake_compute(
cluster: &str,
) -> Result<(String, u16), ConsoleAuthError> {
let mut url = reqwest::Url::parse(&format!("{auth_endpoint}/proxy_wake_compute"))?;
url.query_pairs_mut().append_pair("cluster", cluster);
url.query_pairs_mut().append_pair("project", cluster);
// TODO: use a proper logger
println!("cplane request: {}", url);

View File

@@ -25,7 +25,9 @@ use config::ProxyConfig;
use futures::FutureExt;
use std::{future::Future, net::SocketAddr};
use tokio::{net::TcpListener, task::JoinError};
use utils::GIT_VERSION;
use utils::project_git_version;
project_git_version!(GIT_VERSION);
/// Flattens `Result<Result<T>>` into `Result<T>`.
async fn flatten_err(
@@ -124,7 +126,7 @@ async fn main() -> anyhow::Result<()> {
auth_link_uri: arg_matches.value_of("uri").unwrap().parse()?,
}));
println!("Version: {}", GIT_VERSION);
println!("Version: {GIT_VERSION}");
// Check that we can bind to address before further initialization
println!("Starting http on {}", http_address);

View File

@@ -29,6 +29,7 @@ hex = "0.4.3"
const_format = "0.2.21"
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-util = { version = "0.7", features = ["io"] }
git-version = "0.3.5"
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }

View File

@@ -17,16 +17,19 @@ use url::{ParseError, Url};
use safekeeper::control_file::{self};
use safekeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use safekeeper::remove_wal;
use safekeeper::timeline::GlobalTimelines;
use safekeeper::wal_service;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, callmemaybe};
use safekeeper::{http, s3_offload};
use utils::{
http::endpoint, logging, shutdown::exit_now, signals, tcp_listener, zid::ZNodeId, GIT_VERSION,
http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener,
zid::ZNodeId,
};
const LOCK_FILE_NAME: &str = "safekeeper.lock";
const ID_FILE_NAME: &str = "safekeeper.id";
project_git_version!(GIT_VERSION);
fn main() -> Result<()> {
metrics::set_common_metrics_prefix("safekeeper");
@@ -193,7 +196,7 @@ fn main() -> Result<()> {
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: bool) -> Result<()> {
let log_file = logging::init("safekeeper.log", conf.daemonize)?;
info!("version: {}", GIT_VERSION);
info!("version: {GIT_VERSION}");
// Prevent running multiple safekeepers on the same directory
let lock_file_path = conf.workdir.join(LOCK_FILE_NAME);
@@ -249,6 +252,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel();
GlobalTimelines::set_callmemaybe_tx(callmemaybe_tx);
let conf_ = conf.clone();
threads.push(
@@ -277,13 +282,12 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
);
}
let (tx, rx) = mpsc::unbounded_channel();
let conf_cloned = conf.clone();
let safekeeper_thread = thread::Builder::new()
.name("Safekeeper thread".into())
.spawn(|| {
// thread code
let thread_result = wal_service::thread_main(conf_cloned, pg_listener, tx);
let thread_result = wal_service::thread_main(conf_cloned, pg_listener);
if let Err(e) = thread_result {
info!("safekeeper thread terminated: {}", e);
}
@@ -297,7 +301,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
.name("callmemaybe thread".into())
.spawn(|| {
// thread code
let thread_result = callmemaybe::thread_main(conf_cloned, rx);
let thread_result = callmemaybe::thread_main(conf_cloned, callmemaybe_rx);
if let Err(e) = thread_result {
error!("callmemaybe thread terminated: {}", e);
}

View File

@@ -60,7 +60,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// lock is held.
for zttid in GlobalTimelines::get_active_timelines() {
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
let sk_info = tli.get_public_info()?;
let sk_info = tli.get_public_info(&conf)?;
let put_opts = PutOptions::new().with_lease(lease.id());
client
.put(

View File

@@ -21,9 +21,6 @@ use utils::{
zid::{ZTenantId, ZTenantTimelineId, ZTimelineId},
};
use crate::callmemaybe::CallmeEvent;
use tokio::sync::mpsc::UnboundedSender;
/// Safekeeper handler of postgres commands
pub struct SafekeeperPostgresHandler {
pub conf: SafeKeeperConf,
@@ -33,8 +30,6 @@ pub struct SafekeeperPostgresHandler {
pub ztimelineid: Option<ZTimelineId>,
pub timeline: Option<Arc<Timeline>>,
pageserver_connstr: Option<String>,
//sender to communicate with callmemaybe thread
pub tx: UnboundedSender<CallmeEvent>,
}
/// Parsed Postgres command.
@@ -140,7 +135,7 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
}
impl SafekeeperPostgresHandler {
pub fn new(conf: SafeKeeperConf, tx: UnboundedSender<CallmeEvent>) -> Self {
pub fn new(conf: SafeKeeperConf) -> Self {
SafekeeperPostgresHandler {
conf,
appname: None,
@@ -148,7 +143,6 @@ impl SafekeeperPostgresHandler {
ztimelineid: None,
timeline: None,
pageserver_connstr: None,
tx,
}
}

View File

@@ -3,19 +3,20 @@ use hyper::{Body, Request, Response, StatusCode};
use serde::Serialize;
use serde::Serializer;
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
use crate::timeline::GlobalTimelines;
use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult};
use crate::SafeKeeperConf;
use utils::{
http::{
endpoint,
error::ApiError,
json::{json_request, json_response},
request::parse_request_param,
request::{ensure_no_body, parse_request_param},
RequestExt, RouterBuilder,
},
lsn::Lsn,
@@ -130,6 +131,44 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
json_response(StatusCode::CREATED, ())
}
/// Deactivates the timeline and removes its data directory.
///
/// It does not try to stop any processing of the timeline; there is no such code at the time of writing.
/// However, it tries to check whether the timeline was active and report it to caller just in case.
/// Note that this information is inaccurate:
/// 1. There is a race condition between checking the timeline for activity and actual directory deletion.
/// 2. At the time of writing Safekeeper rarely marks a timeline inactive. E.g. disconnecting the compute node does nothing.
async fn timeline_delete_force_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let zttid = ZTenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
ensure_no_body(&mut request).await?;
json_response(
StatusCode::OK,
GlobalTimelines::delete_force(get_conf(&request), &zttid).map_err(ApiError::from_err)?,
)
}
/// Deactivates all timelines for the tenant and removes its data directory.
/// See `timeline_delete_force_handler`.
async fn tenant_delete_force_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id = parse_request_param(&request, "tenant_id")?;
ensure_no_body(&mut request).await?;
json_response(
StatusCode::OK,
GlobalTimelines::delete_force_all_for_tenant(get_conf(&request), &tenant_id)
.map_err(ApiError::from_err)?
.iter()
.map(|(zttid, resp)| (format!("{}", zttid.timeline_id), *resp))
.collect::<HashMap<String, TimelineDeleteForceResult>>(),
)
}
/// Used only in tests to hand craft required data.
async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let zttid = ZTenantTimelineId::new(
@@ -155,6 +194,11 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
timeline_status_handler,
)
.post("/v1/timeline", timeline_create_handler)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_delete_force_handler,
)
.delete("/v1/tenant/:tenant_id", tenant_delete_force_handler)
// for tests
.post(
"/v1/record_safekeeper_info/:tenant_id/:timeline_id",

View File

@@ -3,7 +3,7 @@ use std::path::PathBuf;
use std::time::Duration;
use url::Url;
use utils::zid::{ZNodeId, ZTenantTimelineId};
use utils::zid::{ZNodeId, ZTenantId, ZTenantTimelineId};
pub mod broker;
pub mod callmemaybe;
@@ -31,7 +31,7 @@ pub mod defaults {
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
}
#[derive(Debug, Clone)]
@@ -57,9 +57,12 @@ pub struct SafeKeeperConf {
}
impl SafeKeeperConf {
pub fn tenant_dir(&self, tenant_id: &ZTenantId) -> PathBuf {
self.workdir.join(tenant_id.to_string())
}
pub fn timeline_dir(&self, zttid: &ZTenantTimelineId) -> PathBuf {
self.workdir
.join(zttid.tenant_id.to_string())
self.tenant_dir(&zttid.tenant_id)
.join(zttid.timeline_id.to_string())
}
}

View File

@@ -5,7 +5,6 @@
use anyhow::{anyhow, bail, Result};
use bytes::BytesMut;
use tokio::sync::mpsc::UnboundedSender;
use tracing::*;
use crate::timeline::Timeline;
@@ -28,8 +27,6 @@ use utils::{
sock_split::ReadStream,
};
use crate::callmemaybe::CallmeEvent;
pub struct ReceiveWalConn<'pg> {
/// Postgres connection
pg_backend: &'pg mut PostgresBackend,
@@ -91,10 +88,9 @@ impl<'pg> ReceiveWalConn<'pg> {
// Register the connection and defer unregister.
spg.timeline
.get()
.on_compute_connect(self.pageserver_connstr.as_ref(), &spg.tx)?;
.on_compute_connect(self.pageserver_connstr.as_ref())?;
let _guard = ComputeConnectionGuard {
timeline: Arc::clone(spg.timeline.get()),
callmemaybe_tx: spg.tx.clone(),
};
let mut next_msg = Some(next_msg);
@@ -194,13 +190,10 @@ impl ProposerPollStream {
struct ComputeConnectionGuard {
timeline: Arc<Timeline>,
callmemaybe_tx: UnboundedSender<CallmeEvent>,
}
impl Drop for ComputeConnectionGuard {
fn drop(&mut self) {
self.timeline
.on_compute_disconnect(&self.callmemaybe_tx)
.unwrap();
self.timeline.on_compute_disconnect().unwrap();
}
}

View File

@@ -264,13 +264,13 @@ impl ReplicationConn {
} else {
let pageserver_connstr = pageserver_connstr.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
let zttid = spg.timeline.get().zttid;
let tx_clone = spg.tx.clone();
let tx_clone = spg.timeline.get().callmemaybe_tx.clone();
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.clone(),
);
spg.tx
tx_clone
.send(CallmeEvent::Pause(subscription_key))
.unwrap_or_else(|e| {
error!("failed to send Pause request to callmemaybe thread {}", e);
@@ -315,7 +315,7 @@ impl ReplicationConn {
} else {
// TODO: also check once in a while whether we are walsender
// to right pageserver.
if spg.timeline.get().check_deactivate(replica_id, &spg.tx)? {
if spg.timeline.get().check_deactivate(replica_id)? {
// Shut down, timeline is suspended.
// TODO create proper error type for this
bail!("end streaming to {:?}", spg.appname);

View File

@@ -7,6 +7,8 @@ use etcd_broker::SkTimelineInfo;
use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::XLogSegNo;
use serde::Serialize;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::{self};
@@ -19,7 +21,7 @@ use tracing::*;
use utils::{
lsn::Lsn,
pq_proto::ZenithFeedback,
zid::{ZNodeId, ZTenantTimelineId},
zid::{ZNodeId, ZTenantId, ZTenantTimelineId},
};
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
@@ -89,7 +91,6 @@ struct SharedState {
active: bool,
num_computes: u32,
pageserver_connstr: Option<String>,
listen_pg_addr: String,
last_removed_segno: XLogSegNo,
}
@@ -112,7 +113,6 @@ impl SharedState {
active: false,
num_computes: 0,
pageserver_connstr: None,
listen_pg_addr: conf.listen_pg_addr.clone(),
last_removed_segno: 0,
})
}
@@ -132,7 +132,6 @@ impl SharedState {
active: false,
num_computes: 0,
pageserver_connstr: None,
listen_pg_addr: conf.listen_pg_addr.clone(),
last_removed_segno: 0,
})
}
@@ -278,15 +277,21 @@ impl SharedState {
/// Database instance (tenant)
pub struct Timeline {
pub zttid: ZTenantTimelineId,
pub callmemaybe_tx: UnboundedSender<CallmeEvent>,
mutex: Mutex<SharedState>,
/// conditional variable used to notify wal senders
cond: Condvar,
}
impl Timeline {
fn new(zttid: ZTenantTimelineId, shared_state: SharedState) -> Timeline {
fn new(
zttid: ZTenantTimelineId,
callmemaybe_tx: UnboundedSender<CallmeEvent>,
shared_state: SharedState,
) -> Timeline {
Timeline {
zttid,
callmemaybe_tx,
mutex: Mutex::new(shared_state),
cond: Condvar::new(),
}
@@ -295,34 +300,27 @@ impl Timeline {
/// Register compute connection, starting timeline-related activity if it is
/// not running yet.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_connect(
&self,
pageserver_connstr: Option<&String>,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<()> {
pub fn on_compute_connect(&self, pageserver_connstr: Option<&String>) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes += 1;
// FIXME: currently we always adopt latest pageserver connstr, but we
// should have kind of generations assigned by compute to distinguish
// the latest one or even pass it through consensus to reliably deliver
// to all safekeepers.
shared_state.activate(&self.zttid, pageserver_connstr, callmemaybe_tx)?;
shared_state.activate(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?;
Ok(())
}
/// De-register compute connection, shutting down timeline activity if
/// pageserver doesn't need catchup.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_disconnect(
&self,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<()> {
pub fn on_compute_disconnect(&self) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes -= 1;
// If there is no pageserver, can suspend right away; otherwise let
// walsender do that.
if shared_state.num_computes == 0 && shared_state.pageserver_connstr.is_none() {
shared_state.deactivate(&self.zttid, callmemaybe_tx)?;
shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?;
}
Ok(())
}
@@ -330,11 +328,7 @@ impl Timeline {
/// Deactivate tenant if there is no computes and pageserver is caughtup,
/// assuming the pageserver status is in replica_id.
/// Returns true if deactivated.
pub fn check_deactivate(
&self,
replica_id: usize,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<bool> {
pub fn check_deactivate(&self, replica_id: usize) -> Result<bool> {
let mut shared_state = self.mutex.lock().unwrap();
if !shared_state.active {
// already suspended
@@ -346,13 +340,27 @@ impl Timeline {
(replica_state.last_received_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.last_received_lsn >= shared_state.sk.inmem.commit_lsn);
if deactivate {
shared_state.deactivate(&self.zttid, callmemaybe_tx)?;
shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?;
return Ok(true);
}
}
Ok(false)
}
/// Deactivates the timeline, assuming it is being deleted.
/// Returns whether the timeline was already active.
///
/// The callmemaybe thread is stopped by the deactivation message. We assume all other threads
/// will stop by themselves eventually (possibly with errors, but no panics). There should be no
/// compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
/// we're deleting the timeline anyway.
pub fn deactivate_for_delete(&self) -> Result<bool> {
let mut shared_state = self.mutex.lock().unwrap();
let was_active = shared_state.active;
shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?;
Ok(was_active)
}
fn is_active(&self) -> bool {
let shared_state = self.mutex.lock().unwrap();
shared_state.active
@@ -421,7 +429,7 @@ impl Timeline {
}
/// Prepare public safekeeper info for reporting.
pub fn get_public_info(&self) -> anyhow::Result<SkTimelineInfo> {
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result<SkTimelineInfo> {
let shared_state = self.mutex.lock().unwrap();
Ok(SkTimelineInfo {
last_log_term: Some(shared_state.sk.get_epoch()),
@@ -435,18 +443,7 @@ impl Timeline {
shared_state.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
wal_stream_connection_string: shared_state
.pageserver_connstr
.as_deref()
.map(|pageserver_connstr| {
wal_stream_connection_string(
self.zttid,
&shared_state.listen_pg_addr,
pageserver_connstr,
)
})
.transpose()
.context("Failed to get the pageserver callmemaybe connstr")?,
safekeeper_connection_string: Some(conf.listen_pg_addr.clone()),
})
}
@@ -504,29 +501,6 @@ impl Timeline {
}
}
// pageserver connstr is needed to be able to distinguish between different pageservers
// it is required to correctly manage callmemaybe subscriptions when more than one pageserver is involved
// TODO it is better to use some sort of a unique id instead of connection string, see https://github.com/zenithdb/zenith/issues/1105
fn wal_stream_connection_string(
ZTenantTimelineId {
tenant_id,
timeline_id,
}: ZTenantTimelineId,
listen_pg_addr_str: &str,
pageserver_connstr: &str,
) -> anyhow::Result<String> {
let me_connstr = format!("postgresql://no_user@{}/no_db", listen_pg_addr_str);
let me_conf = me_connstr
.parse::<postgres::config::Config>()
.with_context(|| {
format!("Failed to parse pageserver connection string '{me_connstr}' as a postgres one")
})?;
let (host, port) = utils::connstring::connection_host_port(&me_conf);
Ok(format!(
"host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'",
))
}
// Utilities needed by various Connection-like objects
pub trait TimelineTools {
fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()>;
@@ -545,22 +519,41 @@ impl TimelineTools for Option<Arc<Timeline>> {
}
}
struct GlobalTimelinesState {
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
callmemaybe_tx: Option<UnboundedSender<CallmeEvent>>,
}
lazy_static! {
pub static ref TIMELINES: Mutex<HashMap<ZTenantTimelineId, Arc<Timeline>>> =
Mutex::new(HashMap::new());
static ref TIMELINES_STATE: Mutex<GlobalTimelinesState> = Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
callmemaybe_tx: None
});
}
#[derive(Clone, Copy, Serialize)]
pub struct TimelineDeleteForceResult {
pub dir_existed: bool,
pub was_active: bool,
}
/// A zero-sized struct used to manage access to the global timelines map.
pub struct GlobalTimelines;
impl GlobalTimelines {
pub fn set_callmemaybe_tx(callmemaybe_tx: UnboundedSender<CallmeEvent>) {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.callmemaybe_tx.is_none());
state.callmemaybe_tx = Some(callmemaybe_tx);
}
fn create_internal(
mut timelines: MutexGuard<HashMap<ZTenantTimelineId, Arc<Timeline>>>,
mut state: MutexGuard<GlobalTimelinesState>,
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
peer_ids: Vec<ZNodeId>,
) -> Result<Arc<Timeline>> {
match timelines.get(&zttid) {
match state.timelines.get(&zttid) {
Some(_) => bail!("timeline {} already exists", zttid),
None => {
// TODO: check directory existence
@@ -569,8 +562,12 @@ impl GlobalTimelines {
let shared_state = SharedState::create(conf, &zttid, peer_ids)
.context("failed to create shared state")?;
let new_tli = Arc::new(Timeline::new(zttid, shared_state));
timelines.insert(zttid, Arc::clone(&new_tli));
let new_tli = Arc::new(Timeline::new(
zttid,
state.callmemaybe_tx.as_ref().unwrap().clone(),
shared_state,
));
state.timelines.insert(zttid, Arc::clone(&new_tli));
Ok(new_tli)
}
}
@@ -581,20 +578,20 @@ impl GlobalTimelines {
zttid: ZTenantTimelineId,
peer_ids: Vec<ZNodeId>,
) -> Result<Arc<Timeline>> {
let timelines = TIMELINES.lock().unwrap();
GlobalTimelines::create_internal(timelines, conf, zttid, peer_ids)
let state = TIMELINES_STATE.lock().unwrap();
GlobalTimelines::create_internal(state, conf, zttid, peer_ids)
}
/// Get a timeline with control file loaded from the global TIMELINES map.
/// Get a timeline with control file loaded from the global TIMELINES_STATE.timelines map.
/// If control file doesn't exist and create=false, bails out.
pub fn get(
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
create: bool,
) -> Result<Arc<Timeline>> {
let mut timelines = TIMELINES.lock().unwrap();
let mut state = TIMELINES_STATE.lock().unwrap();
match timelines.get(&zttid) {
match state.timelines.get(&zttid) {
Some(result) => Ok(Arc::clone(result)),
None => {
let shared_state =
@@ -610,20 +607,19 @@ impl GlobalTimelines {
.contains("No such file or directory")
&& create
{
return GlobalTimelines::create_internal(
timelines,
conf,
zttid,
vec![],
);
return GlobalTimelines::create_internal(state, conf, zttid, vec![]);
} else {
return Err(error);
}
}
};
let new_tli = Arc::new(Timeline::new(zttid, shared_state));
timelines.insert(zttid, Arc::clone(&new_tli));
let new_tli = Arc::new(Timeline::new(
zttid,
state.callmemaybe_tx.as_ref().unwrap().clone(),
shared_state,
));
state.timelines.insert(zttid, Arc::clone(&new_tli));
Ok(new_tli)
}
}
@@ -631,11 +627,86 @@ impl GlobalTimelines {
/// Get ZTenantTimelineIDs of all active timelines.
pub fn get_active_timelines() -> Vec<ZTenantTimelineId> {
let timelines = TIMELINES.lock().unwrap();
timelines
let state = TIMELINES_STATE.lock().unwrap();
state
.timelines
.iter()
.filter(|&(_, tli)| tli.is_active())
.map(|(zttid, _)| *zttid)
.collect()
}
fn delete_force_internal(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,
was_active: bool,
) -> Result<TimelineDeleteForceResult> {
match std::fs::remove_dir_all(conf.timeline_dir(zttid)) {
Ok(_) => Ok(TimelineDeleteForceResult {
dir_existed: true,
was_active,
}),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(TimelineDeleteForceResult {
dir_existed: false,
was_active,
}),
Err(e) => Err(e.into()),
}
}
/// Deactivates and deletes the timeline, see `Timeline::deactivate_for_delete()`, the deletes
/// the corresponding data directory.
/// We assume all timeline threads do not care about `GlobalTimelines` not containing the timeline
/// anymore, and they will eventually terminate without panics.
///
/// There are multiple ways the timeline may be accidentally "re-created" (so we end up with two
/// `Timeline` objects in memory):
/// a) a compute node connects after this method is called, or
/// b) an HTTP GET request about the timeline is made and it's able to restore the current state, or
/// c) an HTTP POST request for timeline creation is made after the timeline is already deleted.
/// TODO: ensure all of the above never happens.
pub fn delete_force(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,
) -> Result<TimelineDeleteForceResult> {
info!("deleting timeline {}", zttid);
let was_active = match TIMELINES_STATE.lock().unwrap().timelines.remove(zttid) {
None => false,
Some(tli) => tli.deactivate_for_delete()?,
};
GlobalTimelines::delete_force_internal(conf, zttid, was_active)
}
/// Deactivates and deletes all timelines for the tenant, see `delete()`.
/// Returns map of all timelines which the tenant had, `true` if a timeline was active.
pub fn delete_force_all_for_tenant(
conf: &SafeKeeperConf,
tenant_id: &ZTenantId,
) -> Result<HashMap<ZTenantTimelineId, TimelineDeleteForceResult>> {
info!("deleting all timelines for tenant {}", tenant_id);
let mut state = TIMELINES_STATE.lock().unwrap();
let mut deleted = HashMap::new();
for (zttid, tli) in &state.timelines {
if zttid.tenant_id == *tenant_id {
deleted.insert(
*zttid,
GlobalTimelines::delete_force_internal(
conf,
zttid,
tli.deactivate_for_delete()?,
)?,
);
}
}
// TODO: test that the exact subset of timelines is removed.
state
.timelines
.retain(|zttid, _| !deleted.contains_key(zttid));
match std::fs::remove_dir_all(conf.tenant_dir(tenant_id)) {
Ok(_) => (),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => (),
e => e?,
};
Ok(deleted)
}
}

View File

@@ -8,29 +8,22 @@ use std::net::{TcpListener, TcpStream};
use std::thread;
use tracing::*;
use crate::callmemaybe::CallmeEvent;
use crate::handler::SafekeeperPostgresHandler;
use crate::SafeKeeperConf;
use tokio::sync::mpsc::UnboundedSender;
use utils::postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
pub fn thread_main(
conf: SafeKeeperConf,
listener: TcpListener,
tx: UnboundedSender<CallmeEvent>,
) -> Result<()> {
pub fn thread_main(conf: SafeKeeperConf, listener: TcpListener) -> Result<()> {
loop {
match listener.accept() {
Ok((socket, peer_addr)) => {
debug!("accepted connection from {}", peer_addr);
let conf = conf.clone();
let tx_clone = tx.clone();
let _ = thread::Builder::new()
.name("WAL service thread".into())
.spawn(move || {
if let Err(err) = handle_socket(socket, conf, tx_clone) {
if let Err(err) = handle_socket(socket, conf) {
error!("connection handler exited: {}", err);
}
})
@@ -51,16 +44,12 @@ fn get_tid() -> u64 {
/// This is run by `thread_main` above, inside a background thread.
///
fn handle_socket(
socket: TcpStream,
conf: SafeKeeperConf,
tx: UnboundedSender<CallmeEvent>,
) -> Result<()> {
fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<()> {
let _enter = info_span!("", tid = ?get_tid()).entered();
socket.set_nodelay(true)?;
let mut conn_handler = SafekeeperPostgresHandler::new(conf, tx);
let mut conn_handler = SafekeeperPostgresHandler::new(conf);
let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, false)?;
// libpq replication protocol between safekeeper and replicas/pagers
pgbackend.run(&mut conn_handler)?;

View File

@@ -21,7 +21,7 @@ def test_ancestor_branch(zenith_env_builder: ZenithEnvBuilder):
# Override defaults, 1M gc_horizon and 4M checkpoint_distance.
# Extend compaction_period and gc_period to disable background compaction and gc.
tenant = env.zenith_cli.create_tenant(
tenant, _ = env.zenith_cli.create_tenant(
conf={
'gc_period': '10 m',
'gc_horizon': '1048576',
@@ -35,7 +35,6 @@ def test_ancestor_branch(zenith_env_builder: ZenithEnvBuilder):
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
pscur.execute("failpoints flush-frozen=sleep(10000)")
env.zenith_cli.create_timeline(f'main', tenant_id=tenant)
pg_branch0 = env.postgres.create_start('main', tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()
branch0_cur.execute("SHOW zenith.zenith_timeline")

View File

@@ -19,6 +19,8 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
#
# See https://github.com/zenithdb/zenith/issues/1068
zenith_env_builder.num_safekeepers = 1
# Disable pitr, because here we want to test branch creation after GC
zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
env = zenith_env_builder.init_start()
# Branch at the point where only 100 rows were inserted

View File

@@ -1,7 +1,7 @@
import asyncio
import random
from fixtures.zenith_fixtures import ZenithEnv, Postgres
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, Postgres
from fixtures.log_helper import log
# Test configuration
@@ -50,9 +50,12 @@ async def update_and_gc(env: ZenithEnv, pg: Postgres, timeline: str):
#
# (repro for https://github.com/zenithdb/zenith/issues/1047)
#
def test_gc_aggressive(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli.create_branch("test_gc_aggressive", "empty")
def test_gc_aggressive(zenith_env_builder: ZenithEnvBuilder):
# Disable pitr, because here we want to test branch creation after GC
zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_gc_aggressive", "main")
pg = env.postgres.create_start('test_gc_aggressive')
log.info('postgres is running on test_gc_aggressive branch')

View File

@@ -1,5 +1,7 @@
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
import psycopg2.extras
#
@@ -12,9 +14,11 @@ from fixtures.log_helper import log
# just a hint that the page hasn't been modified since that LSN, and the page
# server should return the latest page version regardless of the LSN.
#
def test_old_request_lsn(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli.create_branch("test_old_request_lsn", "empty")
def test_old_request_lsn(zenith_env_builder: ZenithEnvBuilder):
# Disable pitr, because here we want to test branch creation after GC
zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_old_request_lsn", "main")
pg = env.postgres.create_start('test_old_request_lsn')
log.info('postgres is running on test_old_request_lsn branch')
@@ -26,7 +30,7 @@ def test_old_request_lsn(zenith_simple_env: ZenithEnv):
timeline = cur.fetchone()[0]
psconn = env.pageserver.connect()
pscur = psconn.cursor()
pscur = psconn.cursor(cursor_factory=psycopg2.extras.DictCursor)
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers.
@@ -53,6 +57,9 @@ def test_old_request_lsn(zenith_simple_env: ZenithEnv):
# garbage collections so that the page server will remove old page versions.
for i in range(10):
pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
for j in range(100):
cur.execute('UPDATE foo SET val = val + 1 WHERE id = 1;')

View File

@@ -0,0 +1,77 @@
import subprocess
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
from fixtures.zenith_fixtures import ZenithEnvBuilder
#
# Check pitr_interval GC behavior.
# Insert some data, run GC and create a branch in the past.
#
def test_pitr_gc(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
# Set pitr interval such that we need to keep the data
zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '1 day', gc_horizon = 0}"
env = zenith_env_builder.init_start()
pgmain = env.postgres.create_start('main')
log.info("postgres is running on 'main' branch")
main_pg_conn = pgmain.connect()
main_cur = main_pg_conn.cursor()
main_cur.execute("SHOW zenith.zenith_timeline")
timeline = main_cur.fetchone()[0]
# Create table
main_cur.execute('CREATE TABLE foo (t text)')
for i in range(10000):
main_cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space';
''')
if i == 99:
# keep some early lsn to test branch creation after GC
main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()')
res = main_cur.fetchone()
lsn_a = res[0]
xid_a = res[1]
log.info(f'LSN after 100 rows: {lsn_a} xid {xid_a}')
main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()')
res = main_cur.fetchone()
debug_lsn = res[0]
debug_xid = res[1]
log.info(f'LSN after 10000 rows: {debug_lsn} xid {debug_xid}')
# run GC
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
pscur.execute(f"compact {env.initial_tenant.hex} {timeline}")
# perform agressive GC. Data still should be kept because of the PITR setting.
pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
# Branch at the point where only 100 rows were inserted
# It must have been preserved by PITR setting
env.zenith_cli.create_branch('test_pitr_gc_hundred', 'main', ancestor_start_lsn=lsn_a)
pg_hundred = env.postgres.create_start('test_pitr_gc_hundred')
# On the 'hundred' branch, we should see only 100 rows
hundred_pg_conn = pg_hundred.connect()
hundred_cur = hundred_pg_conn.cursor()
hundred_cur.execute('SELECT count(*) FROM foo')
assert hundred_cur.fetchone() == (100, )
# All the rows are visible on the main branch
main_cur.execute('SELECT count(*) FROM foo')
assert main_cur.fetchone() == (10000, )

View File

@@ -16,7 +16,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}'''
env = zenith_env_builder.init_start()
"""Test per tenant configuration"""
tenant = env.zenith_cli.create_tenant(conf={
tenant, _ = env.zenith_cli.create_tenant(conf={
'checkpoint_distance': '20000',
'gc_period': '30sec',
})

View File

@@ -95,6 +95,10 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
log.info('load thread stopped')
@pytest.mark.skip(
reason=
"needs to replace callmemaybe call with better idea how to migrate timelines between pageservers"
)
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
port_distributor: PortDistributor,
@@ -107,7 +111,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
# create folder for remote storage mock
remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage'
tenant = env.zenith_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209"))
tenant, _ = env.zenith_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209"))
log.info("tenant to relocate %s", tenant)
# attach does not download ancestor branches (should it?), just use root branch for now

View File

@@ -12,8 +12,8 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_safekeep
env = zenith_env_builder.init_start()
"""Tests tenants with and without wal acceptors"""
tenant_1 = env.zenith_cli.create_tenant()
tenant_2 = env.zenith_cli.create_tenant()
tenant_1, _ = env.zenith_cli.create_tenant()
tenant_2, _ = env.zenith_cli.create_tenant()
env.zenith_cli.create_timeline(f'test_tenants_normal_work_with_safekeepers{with_safekeepers}',
tenant_id=tenant_1)

View File

@@ -850,3 +850,116 @@ def test_wal_deleted_after_broadcast(zenith_env_builder: ZenithEnvBuilder):
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
assert wal_size_after_checkpoint < 16 * 2.5
def test_delete_force(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
# Create two tenants: one will be deleted, other should be preserved.
tenant_id = env.initial_tenant.hex
timeline_id_1 = env.zenith_cli.create_branch('br1').hex # Acive, delete explicitly
timeline_id_2 = env.zenith_cli.create_branch('br2').hex # Inactive, delete explictly
timeline_id_3 = env.zenith_cli.create_branch('br3').hex # Active, delete with the tenant
timeline_id_4 = env.zenith_cli.create_branch('br4').hex # Inactive, delete with the tenant
tenant_id_other_uuid, timeline_id_other_uuid = env.zenith_cli.create_tenant()
tenant_id_other = tenant_id_other_uuid.hex
timeline_id_other = timeline_id_other_uuid.hex
# Populate branches
pg_1 = env.postgres.create_start('br1')
pg_2 = env.postgres.create_start('br2')
pg_3 = env.postgres.create_start('br3')
pg_4 = env.postgres.create_start('br4')
pg_other = env.postgres.create_start('main', tenant_id=uuid.UUID(hex=tenant_id_other))
for pg in [pg_1, pg_2, pg_3, pg_4, pg_other]:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('CREATE TABLE t(key int primary key)')
sk = env.safekeepers[0]
sk_data_dir = Path(sk.data_dir())
sk_http = sk.http_client()
assert (sk_data_dir / tenant_id / timeline_id_1).is_dir()
assert (sk_data_dir / tenant_id / timeline_id_2).is_dir()
assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
# Stop branches which should be inactive and restart Safekeeper to drop its in-memory state.
pg_2.stop_and_destroy()
pg_4.stop_and_destroy()
sk.stop()
sk.start()
# Ensure connections to Safekeeper are established
for pg in [pg_1, pg_3, pg_other]:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('INSERT INTO t (key) VALUES (1)')
# Remove initial tenant's br1 (active)
assert sk_http.timeline_delete_force(tenant_id, timeline_id_1) == {
"dir_existed": True,
"was_active": True,
}
assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
assert (sk_data_dir / tenant_id / timeline_id_2).is_dir()
assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
# Ensure repeated deletion succeeds
assert sk_http.timeline_delete_force(tenant_id, timeline_id_1) == {
"dir_existed": False, "was_active": False
}
assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
assert (sk_data_dir / tenant_id / timeline_id_2).is_dir()
assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
# Remove initial tenant's br2 (inactive)
assert sk_http.timeline_delete_force(tenant_id, timeline_id_2) == {
"dir_existed": True,
"was_active": False,
}
assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
assert not (sk_data_dir / tenant_id / timeline_id_2).exists()
assert (sk_data_dir / tenant_id / timeline_id_3).is_dir()
assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
# Remove non-existing branch, should succeed
assert sk_http.timeline_delete_force(tenant_id, '00' * 16) == {
"dir_existed": False,
"was_active": False,
}
assert not (sk_data_dir / tenant_id / timeline_id_1).exists()
assert not (sk_data_dir / tenant_id / timeline_id_2).exists()
assert (sk_data_dir / tenant_id / timeline_id_3).exists()
assert (sk_data_dir / tenant_id / timeline_id_4).is_dir()
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
# Remove initial tenant fully (two branches are active)
response = sk_http.tenant_delete_force(tenant_id)
assert response == {
timeline_id_3: {
"dir_existed": True,
"was_active": True,
}
}
assert not (sk_data_dir / tenant_id).exists()
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
# Remove initial tenant again.
response = sk_http.tenant_delete_force(tenant_id)
assert response == {}
assert not (sk_data_dir / tenant_id).exists()
assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir()
# Ensure the other tenant still works
sk_http.timeline_status(tenant_id_other, timeline_id_other)
with closing(pg_other.connect()) as conn:
with conn.cursor() as cur:
cur.execute('INSERT INTO t (key) VALUES (123)')

View File

@@ -1,7 +1,7 @@
import uuid
import requests
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient
from fixtures.zenith_fixtures import DEFAULT_BRANCH_NAME, ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient
from typing import cast
@@ -64,13 +64,13 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv):
helper_compare_tenant_list(pageserver_http_client, env)
# Create new tenant
tenant1 = env.zenith_cli.create_tenant()
tenant1, _ = env.zenith_cli.create_tenant()
# check tenant1 appeared
helper_compare_tenant_list(pageserver_http_client, env)
# Create new tenant
tenant2 = env.zenith_cli.create_tenant()
tenant2, _ = env.zenith_cli.create_tenant()
# check tenant2 appeared
helper_compare_tenant_list(pageserver_http_client, env)
@@ -83,6 +83,16 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv):
assert tenant2.hex in tenants
def test_cli_tenant_create(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
tenant_id, _ = env.zenith_cli.create_tenant()
timelines = env.zenith_cli.list_timelines(tenant_id)
# an initial timeline should be created upon tenant creation
assert len(timelines) == 1
assert timelines[0][0] == DEFAULT_BRANCH_NAME
def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder):
# Start with single sk
zenith_env_builder.num_safekeepers = 1

View File

@@ -106,9 +106,9 @@ class ZenithCompare(PgCompare):
report=MetricReport.LOWER_IS_BETTER)
total_files = self.zenbenchmark.get_int_counter_value(
self.env.pageserver, "pageserver_num_persistent_files_created")
self.env.pageserver, "pageserver_created_persistent_files_total")
total_bytes = self.zenbenchmark.get_int_counter_value(
self.env.pageserver, "pageserver_persistent_bytes_written")
self.env.pageserver, "pageserver_written_persistent_bytes_total")
self.zenbenchmark.record("data_uploaded",
total_bytes / (1024 * 1024),
"MB",

View File

@@ -75,7 +75,8 @@ def lsn_from_hex(lsn_hex: str) -> int:
def print_gc_result(row):
log.info("GC duration {elapsed} ms".format_map(row))
log.info(
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_pitr {layers_needed_by_pitr}"
" needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
.format_map(row))

View File

@@ -831,20 +831,25 @@ class ZenithCli:
def create_tenant(self,
tenant_id: Optional[uuid.UUID] = None,
conf: Optional[Dict[str, str]] = None) -> uuid.UUID:
timeline_id: Optional[uuid.UUID] = None,
conf: Optional[Dict[str, str]] = None) -> Tuple[uuid.UUID, uuid.UUID]:
"""
Creates a new tenant, returns its id and its initial timeline's id.
"""
if tenant_id is None:
tenant_id = uuid.uuid4()
if timeline_id is None:
timeline_id = uuid.uuid4()
if conf is None:
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
res = self.raw_cli([
'tenant', 'create', '--tenant-id', tenant_id.hex, '--timeline-id', timeline_id.hex
])
else:
res = self.raw_cli(
['tenant', 'create', '--tenant-id', tenant_id.hex] +
sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
res = self.raw_cli([
'tenant', 'create', '--tenant-id', tenant_id.hex, '--timeline-id', timeline_id.hex
] + sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
res.check_returncode()
return tenant_id
return tenant_id, timeline_id
def config_tenant(self, tenant_id: uuid.UUID, conf: Dict[str, str]):
"""
@@ -1795,6 +1800,21 @@ class SafekeeperHttpClient(requests.Session):
json=body)
res.raise_for_status()
def timeline_delete_force(self, tenant_id: str, timeline_id: str) -> Dict[Any, Any]:
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}")
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def tenant_delete_force(self, tenant_id: str) -> Dict[Any, Any]:
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def get_metrics(self) -> SafekeeperMetrics:
request_result = self.get(f"http://localhost:{self.port}/metrics")
request_result.raise_for_status()

View File

@@ -18,7 +18,6 @@ from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare
def test_bulk_insert(zenith_with_baseline: PgCompare):
env = zenith_with_baseline
# Get the timeline ID of our branch. We need it for the 'do_gc' command
with closing(env.pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table huge (i int, j int);")

View File

@@ -30,7 +30,7 @@ def test_bulk_tenant_create(
for i in range(tenants_count):
start = timeit.default_timer()
tenant = env.zenith_cli.create_tenant()
tenant, _ = env.zenith_cli.create_tenant()
env.zenith_cli.create_timeline(
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant)

View File

@@ -8,7 +8,6 @@ from fixtures.log_helper import log
import psycopg2.extras
import random
import time
from fixtures.utils import print_gc_result
# This is a clear-box test that demonstrates the worst case scenario for the