mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-22 14:50:38 +00:00
Compare commits
147 Commits
test_multi
...
release-35
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2266ee5971 | ||
|
|
f9214771b4 | ||
|
|
77a68326c5 | ||
|
|
a25504deae | ||
|
|
294b8a8fde | ||
|
|
407a20ceae | ||
|
|
e5b7ddfeee | ||
|
|
b58445d855 | ||
|
|
36050e7f3d | ||
|
|
33360ed96d | ||
|
|
39a28d1108 | ||
|
|
efa6aa134f | ||
|
|
2c724e56e2 | ||
|
|
feff887c6f | ||
|
|
353d915fcf | ||
|
|
2e38098cbc | ||
|
|
a6fe5ea1ac | ||
|
|
05b0aed0c1 | ||
|
|
cd1705357d | ||
|
|
6bc7561290 | ||
|
|
fbd3ac14b5 | ||
|
|
e437787c8f | ||
|
|
3460dbf90b | ||
|
|
6b89d99677 | ||
|
|
6cc8ea86e4 | ||
|
|
e62a492d6f | ||
|
|
a475cdf642 | ||
|
|
7002c79a47 | ||
|
|
ee6cf357b4 | ||
|
|
e5c2086b5f | ||
|
|
5f1208296a | ||
|
|
88e8e473cd | ||
|
|
b0a77844f6 | ||
|
|
1baf464307 | ||
|
|
e9b8e81cea | ||
|
|
85d6194aa4 | ||
|
|
333a7a68ef | ||
|
|
6aa4e41bee | ||
|
|
840183e51f | ||
|
|
cbccc94b03 | ||
|
|
fce227df22 | ||
|
|
bd787e800f | ||
|
|
4a7704b4a3 | ||
|
|
ff1119da66 | ||
|
|
4c3ba1627b | ||
|
|
1407174fb2 | ||
|
|
ec9dcb1889 | ||
|
|
d11d781afc | ||
|
|
4e44565b71 | ||
|
|
4ed51ad33b | ||
|
|
1c1ebe5537 | ||
|
|
c19cb7f386 | ||
|
|
4b97d31b16 | ||
|
|
923ade3dd7 | ||
|
|
b04e711975 | ||
|
|
afd0a6b39a | ||
|
|
99752286d8 | ||
|
|
15df93363c | ||
|
|
bc0ab741af | ||
|
|
51d9dfeaa3 | ||
|
|
f63cb18155 | ||
|
|
0de603d88e | ||
|
|
240913912a | ||
|
|
91a4ea0de2 | ||
|
|
8608704f49 | ||
|
|
efef68ce99 | ||
|
|
8daefd24da | ||
|
|
46cc8b7982 | ||
|
|
38cd90dd0c | ||
|
|
a51b269f15 | ||
|
|
43bf6d0a0f | ||
|
|
15273a9b66 | ||
|
|
78aca668d0 | ||
|
|
acbf4148ea | ||
|
|
6508540561 | ||
|
|
a41b5244a8 | ||
|
|
2b3189be95 | ||
|
|
248563c595 | ||
|
|
14cd6ca933 | ||
|
|
eb36403e71 | ||
|
|
3c6f779698 | ||
|
|
f67f0c1c11 | ||
|
|
edb02d3299 | ||
|
|
664a69e65b | ||
|
|
478322ebf9 | ||
|
|
802f174072 | ||
|
|
47f9890bae | ||
|
|
262265daad | ||
|
|
300da5b872 | ||
|
|
7b22b5c433 | ||
|
|
ffca97bc1e | ||
|
|
cb356f3259 | ||
|
|
c85374295f | ||
|
|
4992160677 | ||
|
|
bd535b3371 | ||
|
|
d90c5a03af | ||
|
|
2d02cc9079 | ||
|
|
49ad94b99f | ||
|
|
948a217398 | ||
|
|
125381eae7 | ||
|
|
cd01bbc715 | ||
|
|
d8b5e3b88d | ||
|
|
06d25f2186 | ||
|
|
f759b561f3 | ||
|
|
ece0555600 | ||
|
|
73ea0a0b01 | ||
|
|
d8f6d6fd6f | ||
|
|
d24de169a7 | ||
|
|
0816168296 | ||
|
|
277b44d57a | ||
|
|
68c2c3880e | ||
|
|
49da498f65 | ||
|
|
2c76ba3dd7 | ||
|
|
dbe3dc69ad | ||
|
|
8e5bb3ed49 | ||
|
|
ab0be7b8da | ||
|
|
b4c55f5d24 | ||
|
|
ede70d833c | ||
|
|
70c3d18bb0 | ||
|
|
7a491f52c4 | ||
|
|
323c4ecb4f | ||
|
|
3d2466607e | ||
|
|
ed478b39f4 | ||
|
|
91585a558d | ||
|
|
93467eae1f | ||
|
|
f3aac81d19 | ||
|
|
979ad60c19 | ||
|
|
9316cb1b1f | ||
|
|
e7939a527a | ||
|
|
36d26665e1 | ||
|
|
873347f977 | ||
|
|
e814ac16f9 | ||
|
|
ad3055d386 | ||
|
|
94e03eb452 | ||
|
|
380f26ef79 | ||
|
|
3c5b7f59d7 | ||
|
|
fee89f80b5 | ||
|
|
41cce8eaf1 | ||
|
|
f88fe0218d | ||
|
|
cc856eca85 | ||
|
|
cf350c6002 | ||
|
|
0ce6b6a0a3 | ||
|
|
73f247d537 | ||
|
|
960be82183 | ||
|
|
806e5a6c19 | ||
|
|
8d5df07cce | ||
|
|
df7a9d1407 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2506,6 +2506,7 @@ dependencies = [
|
||||
"pageserver",
|
||||
"postgres_ffi",
|
||||
"svg_fmt",
|
||||
"tokio",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -2544,6 +2545,7 @@ dependencies = [
|
||||
"metrics",
|
||||
"nix",
|
||||
"num-traits",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -35,6 +35,8 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
num_cpus = { version = "1.15" }
|
||||
num-traits.workspace = true
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
@@ -13,6 +13,7 @@ clap = { workspace = true, features = ["string"] }
|
||||
git-version.workspace = true
|
||||
pageserver = { path = ".." }
|
||||
postgres_ffi.workspace = true
|
||||
tokio.workspace = true
|
||||
utils.workspace = true
|
||||
svg_fmt.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -95,7 +95,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
|
||||
}
|
||||
|
||||
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
|
||||
fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
let file = FileBlockReader::new(VirtualFile::open(path)?);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
@@ -129,7 +129,7 @@ fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
Ok(holes)
|
||||
}
|
||||
|
||||
pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let storage_path = &cmd.path;
|
||||
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
|
||||
|
||||
@@ -160,7 +160,7 @@ pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
parse_filename(&layer.file_name().into_string().unwrap())
|
||||
{
|
||||
if layer_file.is_delta {
|
||||
layer_file.holes = get_holes(&layer.path(), max_holes)?;
|
||||
layer_file.holes = get_holes(&layer.path(), max_holes).await?;
|
||||
n_deltas += 1;
|
||||
}
|
||||
layers.push(layer_file);
|
||||
|
||||
@@ -43,8 +43,7 @@ pub(crate) enum LayerCmd {
|
||||
},
|
||||
}
|
||||
|
||||
fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
use pageserver::tenant::blob_io::BlobCursor;
|
||||
async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
use pageserver::tenant::block_io::BlockReader;
|
||||
|
||||
let path = path.as_ref();
|
||||
@@ -78,7 +77,7 @@ fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
match cmd {
|
||||
LayerCmd::List { path } => {
|
||||
for tenant in fs::read_dir(path.join("tenants"))? {
|
||||
@@ -153,7 +152,7 @@ pub(crate) fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
);
|
||||
|
||||
if layer_file.is_delta {
|
||||
read_delta_file(layer.path())?;
|
||||
read_delta_file(layer.path()).await?;
|
||||
} else {
|
||||
anyhow::bail!("not supported yet :(");
|
||||
}
|
||||
|
||||
@@ -72,12 +72,13 @@ struct AnalyzeLayerMapCmd {
|
||||
max_holes: Option<usize>,
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let cli = CliOpts::parse();
|
||||
|
||||
match cli.command {
|
||||
Commands::Layer(cmd) => {
|
||||
layers::main(&cmd)?;
|
||||
layers::main(&cmd).await?;
|
||||
}
|
||||
Commands::Metadata(cmd) => {
|
||||
handle_metadata(&cmd)?;
|
||||
@@ -86,7 +87,7 @@ fn main() -> anyhow::Result<()> {
|
||||
draw_timeline_dir::main()?;
|
||||
}
|
||||
Commands::AnalyzeLayerMap(cmd) => {
|
||||
layer_map_analyzer::main(&cmd)?;
|
||||
layer_map_analyzer::main(&cmd).await?;
|
||||
}
|
||||
Commands::PrintLayerFile(cmd) => {
|
||||
if let Err(e) = read_pg_control_file(&cmd.path) {
|
||||
@@ -94,7 +95,7 @@ fn main() -> anyhow::Result<()> {
|
||||
"Failed to read input file as a pg control one: {e:#}\n\
|
||||
Attempting to read it as layer file"
|
||||
);
|
||||
print_layerfile(&cmd.path)?;
|
||||
print_layerfile(&cmd.path).await?;
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -113,12 +114,12 @@ fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
async fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx)
|
||||
dump_layerfile_from_path(path, true, &ctx).await
|
||||
}
|
||||
|
||||
fn handle_metadata(
|
||||
|
||||
@@ -994,31 +994,29 @@ async fn timeline_gc_handler(
|
||||
// Run compaction immediately on given timeline.
|
||||
async fn timeline_compact_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let result_receiver = mgr::immediate_compact(tenant_id, timeline_id, &ctx)
|
||||
.await
|
||||
.context("spawn compaction task")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let result: anyhow::Result<()> = result_receiver
|
||||
.await
|
||||
.context("receive compaction result")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
result.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
timeline
|
||||
.compact(&cancel, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
// Run checkpoint immediately on given timeline.
|
||||
async fn timeline_checkpoint_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
@@ -1031,13 +1029,13 @@ async fn timeline_checkpoint_handler(
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
timeline
|
||||
.compact(&ctx)
|
||||
.compact(&cancel, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("manual_checkpoint", tenant_id = %tenant_id, timeline_id = %timeline_id))
|
||||
.instrument(info_span!("manual_checkpoint", %tenant_id, %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ use metrics::{
|
||||
IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::models::TenantState;
|
||||
use strum::VariantNames;
|
||||
use strum_macros::{EnumVariantNames, IntoStaticStr};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -84,11 +83,10 @@ pub static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static READ_NUM_FS_LAYERS: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
pub(crate) static READ_NUM_FS_LAYERS: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_read_num_fs_layers",
|
||||
"Number of persistent layers accessed for processing a read request, including those in the cache",
|
||||
&["tenant_id", "timeline_id"],
|
||||
vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 10.0, 20.0, 50.0, 100.0],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
@@ -112,11 +110,10 @@ pub static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static GET_RECONSTRUCT_DATA_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_getpage_get_reconstruct_data_seconds",
|
||||
"Time spent in get_reconstruct_value_data",
|
||||
&["tenant_id", "timeline_id"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
@@ -246,11 +243,10 @@ pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheS
|
||||
},
|
||||
});
|
||||
|
||||
static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_wait_lsn_seconds",
|
||||
"Time spent waiting for WAL to arrive",
|
||||
&["tenant_id", "timeline_id"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
@@ -309,11 +305,24 @@ static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define current logical size metric")
|
||||
});
|
||||
|
||||
pub static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
pub(crate) static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_tenant_states_count",
|
||||
"Count of tenants per state",
|
||||
&["tenant_id", "state"]
|
||||
&["state"]
|
||||
)
|
||||
.expect("Failed to register pageserver_tenant_states_count metric")
|
||||
});
|
||||
|
||||
/// A set of broken tenants.
|
||||
///
|
||||
/// These are expected to be so rare that a set is fine. Set as in a new timeseries per each broken
|
||||
/// tenant.
|
||||
pub(crate) static BROKEN_TENANTS_SET: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_broken_tenants_count",
|
||||
"Set of broken tenants",
|
||||
&["tenant_id"]
|
||||
)
|
||||
.expect("Failed to register pageserver_tenant_states_count metric")
|
||||
});
|
||||
@@ -499,23 +508,31 @@ const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
|
||||
30.000, // 30000 ms
|
||||
];
|
||||
|
||||
const STORAGE_IO_TIME_OPERATIONS: &[&str] = &[
|
||||
"open", "close", "read", "write", "seek", "fsync", "gc", "metadata",
|
||||
];
|
||||
|
||||
const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"];
|
||||
|
||||
pub static STORAGE_IO_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
/// Tracks time taken by fs operations near VirtualFile.
|
||||
///
|
||||
/// Operations:
|
||||
/// - open ([`std::fs::OpenOptions::open`])
|
||||
/// - close (dropping [`std::fs::File`])
|
||||
/// - close-by-replace (close by replacement algorithm)
|
||||
/// - read (`read_at`)
|
||||
/// - write (`write_at`)
|
||||
/// - seek (modify internal position or file length query)
|
||||
/// - fsync ([`std::fs::File::sync_all`])
|
||||
/// - metadata ([`std::fs::File::metadata`])
|
||||
pub(crate) static STORAGE_IO_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_io_operations_seconds",
|
||||
"Time spent in IO operations",
|
||||
&["operation", "tenant_id", "timeline_id"],
|
||||
&["operation"],
|
||||
STORAGE_IO_TIME_BUCKETS.into()
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"];
|
||||
|
||||
// Needed for the https://neonprod.grafana.net/d/5uK9tHL4k/picking-tenant-for-relocation?orgId=1
|
||||
pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"Total amount of bytes read/written in IO operations",
|
||||
@@ -605,7 +622,7 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
|
||||
at a given instant. It gives you a better idea of the queue depth \
|
||||
than plotting the gauge directly, since operations may complete faster \
|
||||
than the sampling interval.",
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
|
||||
&["file_kind", "op_kind"],
|
||||
// The calls_unfinished gauge is an integer gauge, hence we have integer buckets.
|
||||
vec![0.0, 1.0, 2.0, 4.0, 6.0, 8.0, 10.0, 15.0, 20.0, 40.0, 60.0, 80.0, 100.0, 500.0],
|
||||
)
|
||||
@@ -662,13 +679,13 @@ impl RemoteOpFileKind {
|
||||
}
|
||||
}
|
||||
|
||||
pub static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
pub(crate) static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_remote_operation_seconds",
|
||||
"Time spent on remote storage operations. \
|
||||
Grouped by tenant, timeline, operation_kind and status. \
|
||||
Does not account for time spent waiting in remote timeline client's queues.",
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind", "status"]
|
||||
&["file_kind", "op_kind", "status"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
@@ -897,7 +914,6 @@ impl StorageTimeMetrics {
|
||||
pub struct TimelineMetrics {
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
pub get_reconstruct_data_time_histo: Histogram,
|
||||
pub flush_time_histo: StorageTimeMetrics,
|
||||
pub compact_time_histo: StorageTimeMetrics,
|
||||
pub create_images_time_histo: StorageTimeMetrics,
|
||||
@@ -906,9 +922,7 @@ pub struct TimelineMetrics {
|
||||
pub load_layer_map_histo: StorageTimeMetrics,
|
||||
pub garbage_collect_histo: StorageTimeMetrics,
|
||||
pub last_record_gauge: IntGauge,
|
||||
pub wait_lsn_time_histo: Histogram,
|
||||
pub resident_physical_size_gauge: UIntGauge,
|
||||
pub read_num_fs_layers: Histogram,
|
||||
/// copy of LayeredTimeline.current_logical_size
|
||||
pub current_logical_size_gauge: UIntGauge,
|
||||
pub num_persistent_files_created: IntCounter,
|
||||
@@ -925,9 +939,6 @@ impl TimelineMetrics {
|
||||
) -> Self {
|
||||
let tenant_id = tenant_id.to_string();
|
||||
let timeline_id = timeline_id.to_string();
|
||||
let get_reconstruct_data_time_histo = GET_RECONSTRUCT_DATA_TIME
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
let flush_time_histo =
|
||||
StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id);
|
||||
let compact_time_histo =
|
||||
@@ -948,9 +959,6 @@ impl TimelineMetrics {
|
||||
let last_record_gauge = LAST_RECORD_LSN
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
let wait_lsn_time_histo = WAIT_LSN_TIME
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
@@ -966,16 +974,12 @@ impl TimelineMetrics {
|
||||
let evictions = EVICTIONS
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
let read_num_fs_layers = READ_NUM_FS_LAYERS
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
let evictions_with_low_residence_duration =
|
||||
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
get_reconstruct_data_time_histo,
|
||||
flush_time_histo,
|
||||
compact_time_histo,
|
||||
create_images_time_histo,
|
||||
@@ -984,7 +988,6 @@ impl TimelineMetrics {
|
||||
garbage_collect_histo,
|
||||
load_layer_map_histo,
|
||||
last_record_gauge,
|
||||
wait_lsn_time_histo,
|
||||
resident_physical_size_gauge,
|
||||
current_logical_size_gauge,
|
||||
num_persistent_files_created,
|
||||
@@ -993,7 +996,6 @@ impl TimelineMetrics {
|
||||
evictions_with_low_residence_duration: std::sync::RwLock::new(
|
||||
evictions_with_low_residence_duration,
|
||||
),
|
||||
read_num_fs_layers,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1002,15 +1004,12 @@ impl Drop for TimelineMetrics {
|
||||
fn drop(&mut self) {
|
||||
let tenant_id = &self.tenant_id;
|
||||
let timeline_id = &self.timeline_id;
|
||||
let _ = GET_RECONSTRUCT_DATA_TIME.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = WAIT_LSN_TIME.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = NUM_PERSISTENT_FILES_CREATED.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = READ_NUM_FS_LAYERS.remove_label_values(&[tenant_id, timeline_id]);
|
||||
|
||||
self.evictions_with_low_residence_duration
|
||||
.write()
|
||||
@@ -1022,9 +1021,6 @@ impl Drop for TimelineMetrics {
|
||||
let _ =
|
||||
STORAGE_TIME_COUNT_PER_TIMELINE.remove_label_values(&[op, tenant_id, timeline_id]);
|
||||
}
|
||||
for op in STORAGE_IO_TIME_OPERATIONS {
|
||||
let _ = STORAGE_IO_TIME.remove_label_values(&[op, tenant_id, timeline_id]);
|
||||
}
|
||||
|
||||
for op in STORAGE_IO_SIZE_OPERATIONS {
|
||||
let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, timeline_id]);
|
||||
@@ -1039,9 +1035,7 @@ impl Drop for TimelineMetrics {
|
||||
pub fn remove_tenant_metrics(tenant_id: &TenantId) {
|
||||
let tid = tenant_id.to_string();
|
||||
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
|
||||
for state in TenantState::VARIANTS {
|
||||
let _ = TENANT_STATE_METRIC.remove_label_values(&[&tid, state]);
|
||||
}
|
||||
// we leave the BROKEN_TENANTS_SET entry if any
|
||||
}
|
||||
|
||||
use futures::Future;
|
||||
@@ -1056,9 +1050,7 @@ pub struct RemoteTimelineClientMetrics {
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
remote_physical_size_gauge: Mutex<Option<UIntGauge>>,
|
||||
remote_operation_time: Mutex<HashMap<(&'static str, &'static str, &'static str), Histogram>>,
|
||||
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
|
||||
calls_started_hist: Mutex<HashMap<(&'static str, &'static str), Histogram>>,
|
||||
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
}
|
||||
@@ -1068,14 +1060,13 @@ impl RemoteTimelineClientMetrics {
|
||||
RemoteTimelineClientMetrics {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
remote_operation_time: Mutex::new(HashMap::default()),
|
||||
calls_unfinished_gauge: Mutex::new(HashMap::default()),
|
||||
calls_started_hist: Mutex::new(HashMap::default()),
|
||||
bytes_started_counter: Mutex::new(HashMap::default()),
|
||||
bytes_finished_counter: Mutex::new(HashMap::default()),
|
||||
remote_physical_size_gauge: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remote_physical_size_gauge(&self) -> UIntGauge {
|
||||
let mut guard = self.remote_physical_size_gauge.lock().unwrap();
|
||||
guard
|
||||
@@ -1089,26 +1080,17 @@ impl RemoteTimelineClientMetrics {
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub fn remote_operation_time(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
status: &'static str,
|
||||
) -> Histogram {
|
||||
let mut guard = self.remote_operation_time.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str(), status);
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_OPERATION_TIME
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
key.2,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
REMOTE_OPERATION_TIME
|
||||
.get_metric_with_label_values(&[key.0, key.1, key.2])
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn calls_unfinished_gauge(
|
||||
@@ -1136,19 +1118,10 @@ impl RemoteTimelineClientMetrics {
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Histogram {
|
||||
let mut guard = self.calls_started_hist.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST
|
||||
.get_metric_with_label_values(&[key.0, key.1])
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn bytes_started_counter(
|
||||
@@ -1328,15 +1301,10 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
remote_physical_size_gauge,
|
||||
remote_operation_time,
|
||||
calls_unfinished_gauge,
|
||||
calls_started_hist,
|
||||
bytes_started_counter,
|
||||
bytes_finished_counter,
|
||||
} = self;
|
||||
for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]);
|
||||
}
|
||||
for ((a, b), _) in calls_unfinished_gauge.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_CALLS_UNFINISHED_GAUGE.remove_label_values(&[
|
||||
tenant_id,
|
||||
@@ -1345,14 +1313,6 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in calls_started_hist.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
a,
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[
|
||||
tenant_id,
|
||||
|
||||
@@ -130,11 +130,25 @@ pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("background op worker")
|
||||
// if you change the number of worker threads please change the constant below
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create background op runtime")
|
||||
});
|
||||
|
||||
pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
|
||||
// force init and thus panics
|
||||
let _ = BACKGROUND_RUNTIME.handle();
|
||||
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
|
||||
// tokio would had already panicked for parsing errors or NotUnicode
|
||||
//
|
||||
// this will be wrong if any of the runtimes gets their worker threads configured to something
|
||||
// else, but that has not been needed in a long time.
|
||||
std::env::var("TOKIO_WORKER_THREADS")
|
||||
.map(|s| s.parse::<usize>().unwrap())
|
||||
.unwrap_or_else(|_e| usize::max(1, num_cpus::get()))
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
@@ -545,7 +559,7 @@ pub fn current_task_id() -> Option<PageserverTaskId> {
|
||||
pub async fn shutdown_watcher() {
|
||||
let token = SHUTDOWN_TOKEN
|
||||
.try_with(|t| t.clone())
|
||||
.expect("shutdown_requested() called in an unexpected task or thread");
|
||||
.expect("shutdown_watcher() called in an unexpected task or thread");
|
||||
|
||||
token.cancelled().await;
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::watch;
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -1335,7 +1336,11 @@ impl Tenant {
|
||||
/// This function is periodically called by compactor task.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
pub async fn compaction_iteration(&self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
pub async fn compaction_iteration(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(
|
||||
self.is_active(),
|
||||
"Cannot run compaction iteration on inactive tenant"
|
||||
@@ -1363,7 +1368,7 @@ impl Tenant {
|
||||
|
||||
for (timeline_id, timeline) in &timelines_to_compact {
|
||||
timeline
|
||||
.compact(ctx)
|
||||
.compact(cancel, ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await?;
|
||||
}
|
||||
@@ -2194,28 +2199,53 @@ impl Tenant {
|
||||
let (state, mut rx) = watch::channel(state);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut current_state: &'static str = From::from(&*rx.borrow_and_update());
|
||||
let tid = tenant_id.to_string();
|
||||
TENANT_STATE_METRIC
|
||||
.with_label_values(&[&tid, current_state])
|
||||
.inc();
|
||||
loop {
|
||||
match rx.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state: &'static str = From::from(&*rx.borrow_and_update());
|
||||
TENANT_STATE_METRIC
|
||||
.with_label_values(&[&tid, current_state])
|
||||
.dec();
|
||||
TENANT_STATE_METRIC
|
||||
.with_label_values(&[&tid, new_state])
|
||||
.inc();
|
||||
|
||||
current_state = new_state;
|
||||
}
|
||||
Err(_sender_dropped_error) => {
|
||||
info!("Tenant dropped the state updates sender, quitting waiting for tenant state change");
|
||||
return;
|
||||
}
|
||||
fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
|
||||
([state.into()], matches!(state, TenantState::Broken { .. }))
|
||||
}
|
||||
|
||||
let mut tuple = inspect_state(&rx.borrow_and_update());
|
||||
|
||||
let is_broken = tuple.1;
|
||||
let mut counted_broken = if !is_broken {
|
||||
// the tenant might be ignored and reloaded, so first remove any previous set
|
||||
// element. it most likely has already been scraped, as these are manual operations
|
||||
// right now. most likely we will add it back very soon.
|
||||
drop(crate::metrics::BROKEN_TENANTS_SET.remove_label_values(&[&tid]));
|
||||
false
|
||||
} else {
|
||||
// add the id to the set right away, there should not be any updates on the channel
|
||||
// after
|
||||
crate::metrics::BROKEN_TENANTS_SET
|
||||
.with_label_values(&[&tid])
|
||||
.set(1);
|
||||
true
|
||||
};
|
||||
|
||||
loop {
|
||||
let labels = &tuple.0;
|
||||
let current = TENANT_STATE_METRIC.with_label_values(labels);
|
||||
current.inc();
|
||||
|
||||
if rx.changed().await.is_err() {
|
||||
// tenant has been dropped; decrement the counter because a tenant with that
|
||||
// state is no longer in tenant map, but allow any broken set item to exist
|
||||
// still.
|
||||
current.dec();
|
||||
break;
|
||||
}
|
||||
|
||||
current.dec();
|
||||
tuple = inspect_state(&rx.borrow_and_update());
|
||||
|
||||
let is_broken = tuple.1;
|
||||
if is_broken && !counted_broken {
|
||||
counted_broken = true;
|
||||
// insert the tenant_id (back) into the set
|
||||
crate::metrics::BROKEN_TENANTS_SET
|
||||
.with_label_values(&[&tid])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -3210,7 +3240,7 @@ impl Drop for Tenant {
|
||||
}
|
||||
}
|
||||
/// Dump contents of a layer file to stdout.
|
||||
pub fn dump_layerfile_from_path(
|
||||
pub async fn dump_layerfile_from_path(
|
||||
path: &Path,
|
||||
verbose: bool,
|
||||
ctx: &RequestContext,
|
||||
@@ -3224,8 +3254,16 @@ pub fn dump_layerfile_from_path(
|
||||
file.read_exact_at(&mut header_buf, 0)?;
|
||||
|
||||
match u16::from_be_bytes(header_buf) {
|
||||
crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
|
||||
crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
|
||||
crate::IMAGE_FILE_MAGIC => {
|
||||
ImageLayer::new_for_path(path, file)?
|
||||
.dump(verbose, ctx)
|
||||
.await?
|
||||
}
|
||||
crate::DELTA_FILE_MAGIC => {
|
||||
DeltaLayer::new_for_path(path, file)?
|
||||
.dump(verbose, ctx)
|
||||
.await?
|
||||
}
|
||||
magic => bail!("unrecognized magic identifier: {:?}", magic),
|
||||
}
|
||||
|
||||
@@ -3441,6 +3479,7 @@ mod tests {
|
||||
use hex_literal::hex;
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
|
||||
@@ -3962,7 +4001,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -3972,7 +4011,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -3982,7 +4021,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -3992,7 +4031,7 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
|
||||
assert_eq!(
|
||||
tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
|
||||
@@ -4061,7 +4100,7 @@ mod tests {
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
@@ -4138,7 +4177,7 @@ mod tests {
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
@@ -4226,7 +4265,7 @@ mod tests {
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
|
||||
@@ -16,29 +16,19 @@ use crate::tenant::block_io::{BlockCursor, BlockReader};
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
/// For reading
|
||||
pub trait BlobCursor {
|
||||
impl<R> BlockCursor<R>
|
||||
where
|
||||
R: BlockReader,
|
||||
{
|
||||
/// Read a blob into a new buffer.
|
||||
fn read_blob(&mut self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
|
||||
pub fn read_blob(&mut self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
|
||||
let mut buf = Vec::new();
|
||||
self.read_blob_into_buf(offset, &mut buf)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
fn read_blob_into_buf(
|
||||
&mut self,
|
||||
offset: u64,
|
||||
dstbuf: &mut Vec<u8>,
|
||||
) -> Result<(), std::io::Error>;
|
||||
}
|
||||
|
||||
impl<R> BlobCursor for BlockCursor<R>
|
||||
where
|
||||
R: BlockReader,
|
||||
{
|
||||
fn read_blob_into_buf(
|
||||
pub fn read_blob_into_buf(
|
||||
&mut self,
|
||||
offset: u64,
|
||||
dstbuf: &mut Vec<u8>,
|
||||
|
||||
@@ -328,7 +328,7 @@ fn to_io_error(e: anyhow::Error, context: &str) -> io::Error {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use rand::{seq::SliceRandom, thread_rng, RngCore};
|
||||
use std::fs;
|
||||
|
||||
@@ -626,17 +626,17 @@ impl LayerMap {
|
||||
|
||||
/// debugging function to print out the contents of the layer map
|
||||
#[allow(unused)]
|
||||
pub fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!("Begin dump LayerMap");
|
||||
|
||||
println!("open_layer:");
|
||||
if let Some(open_layer) = &self.open_layer {
|
||||
open_layer.dump(verbose, ctx)?;
|
||||
open_layer.dump(verbose, ctx).await?;
|
||||
}
|
||||
|
||||
println!("frozen_layers:");
|
||||
for frozen_layer in self.frozen_layers.iter() {
|
||||
frozen_layer.dump(verbose, ctx)?;
|
||||
frozen_layer.dump(verbose, ctx).await?;
|
||||
}
|
||||
|
||||
println!("historic_layers:");
|
||||
|
||||
@@ -768,55 +768,6 @@ pub async fn immediate_gc(
|
||||
Ok(wait_task_done)
|
||||
}
|
||||
|
||||
pub async fn immediate_compact(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<tokio::sync::oneshot::Receiver<anyhow::Result<()>>, ApiError> {
|
||||
let guard = TENANTS.read().await;
|
||||
|
||||
let tenant = guard
|
||||
.get(&tenant_id)
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
let ctx = ctx.detached_child(TaskKind::Compaction, DownloadBehavior::Download);
|
||||
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
|
||||
task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
TaskKind::Compaction,
|
||||
Some(tenant_id),
|
||||
Some(timeline_id),
|
||||
&format!(
|
||||
"timeline_compact_handler compaction run for tenant {tenant_id} timeline {timeline_id}"
|
||||
),
|
||||
false,
|
||||
async move {
|
||||
let result = timeline
|
||||
.compact(&ctx)
|
||||
.instrument(info_span!("manual_compact", %tenant_id, %timeline_id))
|
||||
.await;
|
||||
|
||||
match task_done.send(result) {
|
||||
Ok(_) => (),
|
||||
Err(result) => error!("failed to send compaction result: {result:?}"),
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
|
||||
drop(guard);
|
||||
|
||||
Ok(wait_task_done)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -338,7 +338,8 @@ impl LayerAccessStats {
|
||||
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
|
||||
/// timeline names, because those are known in the context of which the layers
|
||||
/// are used in (timeline).
|
||||
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync {
|
||||
#[async_trait::async_trait]
|
||||
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
|
||||
/// Range of keys that this layer covers
|
||||
fn get_key_range(&self) -> Range<Key>;
|
||||
|
||||
@@ -368,7 +369,7 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync {
|
||||
/// is available. If this returns ValueReconstructResult::Continue, look up
|
||||
/// the predecessor layer and call again with the same 'reconstruct_data' to
|
||||
/// collect more data.
|
||||
fn get_value_reconstruct_data(
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
@@ -377,7 +378,7 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync {
|
||||
) -> Result<ValueReconstructResult>;
|
||||
|
||||
/// Dump summary of the contents of the layer to stdout
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
|
||||
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
|
||||
}
|
||||
|
||||
/// Returned by [`PersistentLayer::iter`]
|
||||
|
||||
@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{PageReadGuard, PAGE_SZ};
|
||||
use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -223,9 +223,10 @@ impl std::fmt::Debug for DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for DeltaLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
@@ -300,7 +301,7 @@ impl Layer for DeltaLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::repository::{Key, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -155,9 +155,10 @@ impl std::fmt::Debug for ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for ImageLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
@@ -189,7 +190,7 @@ impl Layer for ImageLayer {
|
||||
}
|
||||
|
||||
/// Look up given page in the file
|
||||
fn get_value_reconstruct_data(
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::block_io::BlockReader;
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
|
||||
@@ -110,6 +110,7 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for InMemoryLayer {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
Key::MIN..Key::MAX
|
||||
@@ -132,7 +133,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_str = inner
|
||||
@@ -183,7 +184,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
/// Look up given value in the layer.
|
||||
fn get_value_reconstruct_data(
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
@@ -65,8 +65,9 @@ impl std::fmt::Debug for RemoteLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for RemoteLayer {
|
||||
fn get_value_reconstruct_data(
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
@@ -77,7 +78,7 @@ impl Layer for RemoteLayer {
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
|
||||
@@ -111,7 +111,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
// Run compaction
|
||||
if let Err(e) = tenant.compaction_iteration(&ctx).await {
|
||||
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
|
||||
error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
|
||||
wait_duration
|
||||
} else {
|
||||
|
||||
@@ -334,7 +334,7 @@ pub struct GcInfo {
|
||||
#[derive(thiserror::Error)]
|
||||
pub enum PageReconstructError {
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error), // source and Display delegate to anyhow::Error
|
||||
Other(#[from] anyhow::Error),
|
||||
|
||||
/// The operation would require downloading a layer that is missing locally.
|
||||
NeedsDownload(TenantTimelineId, LayerFileName),
|
||||
@@ -475,7 +475,7 @@ impl Timeline {
|
||||
img: cached_page_img,
|
||||
};
|
||||
|
||||
let timer = self.metrics.get_reconstruct_data_time_histo.start_timer();
|
||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
|
||||
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
timer.stop_and_record();
|
||||
@@ -555,7 +555,7 @@ impl Timeline {
|
||||
"wait_lsn cannot be called in WAL receiver"
|
||||
);
|
||||
|
||||
let _timer = self.metrics.wait_lsn_time_histo.start_timer();
|
||||
let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
|
||||
|
||||
match self
|
||||
.last_record_lsn
|
||||
@@ -611,9 +611,46 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
pub async fn compact(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
// repartitioning in the async context. this should give leave us some workers
|
||||
// unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// effects of restarts.
|
||||
//
|
||||
// 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// spawn_blocking.
|
||||
(total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
);
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(
|
||||
permits < total_threads,
|
||||
"need threads avail for shorter work"
|
||||
);
|
||||
tokio::sync::Semaphore::new(permits)
|
||||
});
|
||||
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let _permit = tokio::select! {
|
||||
permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
permit
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timeline was just created
|
||||
@@ -671,11 +708,9 @@ impl Timeline {
|
||||
|
||||
let mut failed = 0;
|
||||
|
||||
let mut cancelled = pin!(task_mgr::shutdown_watcher());
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut cancelled => anyhow::bail!("Cancelled while downloading remote layers"),
|
||||
_ = cancel.cancelled() => anyhow::bail!("Cancelled while downloading remote layers"),
|
||||
res = downloads.next() => {
|
||||
match res {
|
||||
Some(Ok(())) => {},
|
||||
@@ -890,7 +925,7 @@ impl Timeline {
|
||||
new_state,
|
||||
TimelineState::Stopping | TimelineState::Broken { .. }
|
||||
) {
|
||||
// drop the copmletion guard, if any; it might be holding off the completion
|
||||
// drop the completion guard, if any; it might be holding off the completion
|
||||
// forever needlessly
|
||||
self.initial_logical_size_attempt
|
||||
.lock()
|
||||
@@ -2252,8 +2287,9 @@ impl Timeline {
|
||||
let mut timeline_owned;
|
||||
let mut timeline = self;
|
||||
|
||||
let mut read_count =
|
||||
scopeguard::guard(0, |cnt| self.metrics.read_num_fs_layers.observe(cnt as f64));
|
||||
let mut read_count = scopeguard::guard(0, |cnt| {
|
||||
crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
|
||||
});
|
||||
|
||||
// For debugging purposes, collect the path of layers that we traversed
|
||||
// through. It's included in the error message if we fail to find the key.
|
||||
@@ -2387,12 +2423,15 @@ impl Timeline {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match open_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
result = match open_layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
@@ -2414,12 +2453,15 @@ impl Timeline {
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match frozen_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
result = match frozen_layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
@@ -2450,12 +2492,15 @@ impl Timeline {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||
result = match layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
result = match layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
|
||||
@@ -149,12 +149,10 @@ impl OpenFiles {
|
||||
// old file.
|
||||
//
|
||||
if let Some(old_file) = slot_guard.file.take() {
|
||||
// We do not have information about tenant_id/timeline_id of evicted file.
|
||||
// It is possible to store path together with file or use filepath crate,
|
||||
// but as far as close() is not expected to be fast, it is not so critical to gather
|
||||
// precise per-tenant statistic here.
|
||||
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
|
||||
// distinguish the two.
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["close", "-", "-"])
|
||||
.with_label_values(&["close-by-replace"])
|
||||
.observe_closure_duration(|| drop(old_file));
|
||||
}
|
||||
|
||||
@@ -208,7 +206,7 @@ impl VirtualFile {
|
||||
}
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
|
||||
let file = STORAGE_IO_TIME
|
||||
.with_label_values(&["open", &tenant_id, &timeline_id])
|
||||
.with_label_values(&["open"])
|
||||
.observe_closure_duration(|| open_options.open(path))?;
|
||||
|
||||
// Strip all options other than read and write.
|
||||
@@ -271,7 +269,7 @@ impl VirtualFile {
|
||||
// Found a cached file descriptor.
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
return Ok(STORAGE_IO_TIME
|
||||
.with_label_values(&[op, &self.tenant_id, &self.timeline_id])
|
||||
.with_label_values(&[op])
|
||||
.observe_closure_duration(|| func(file)));
|
||||
}
|
||||
}
|
||||
@@ -298,12 +296,12 @@ impl VirtualFile {
|
||||
|
||||
// Open the physical file
|
||||
let file = STORAGE_IO_TIME
|
||||
.with_label_values(&["open", &self.tenant_id, &self.timeline_id])
|
||||
.with_label_values(&["open"])
|
||||
.observe_closure_duration(|| self.open_options.open(&self.path))?;
|
||||
|
||||
// Perform the requested operation on it
|
||||
let result = STORAGE_IO_TIME
|
||||
.with_label_values(&[op, &self.tenant_id, &self.timeline_id])
|
||||
.with_label_values(&[op])
|
||||
.observe_closure_duration(|| func(&file));
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
@@ -333,13 +331,11 @@ impl Drop for VirtualFile {
|
||||
let mut slot_guard = slot.inner.write().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
slot.recently_used.store(false, Ordering::Relaxed);
|
||||
// Unlike files evicted by replacement algorithm, here
|
||||
// we group close time by tenant_id/timeline_id.
|
||||
// At allows to compare number/time of "normal" file closes
|
||||
// with file eviction.
|
||||
// there is also operation "close-by-replace" for closes done on eviction for
|
||||
// comparison.
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["close", &self.tenant_id, &self.timeline_id])
|
||||
.observe_closure_duration(|| slot_guard.file.take());
|
||||
.with_label_values(&["close"])
|
||||
.observe_closure_duration(|| drop(slot_guard.file.take()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,6 +48,14 @@ impl ClientCredentials<'_> {
|
||||
}
|
||||
|
||||
impl<'a> ClientCredentials<'a> {
|
||||
#[cfg(test)]
|
||||
pub fn new_noop() -> Self {
|
||||
ClientCredentials {
|
||||
user: "",
|
||||
project: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse(
|
||||
params: &'a StartupMessageParams,
|
||||
sni: Option<&str>,
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
//! A group of high-level tests for connection establishing logic and auth.
|
||||
use std::borrow::Cow;
|
||||
|
||||
use super::*;
|
||||
use crate::auth::ClientCredentials;
|
||||
use crate::console::{CachedNodeInfo, NodeInfo};
|
||||
use crate::{auth, sasl, scram};
|
||||
use async_trait::async_trait;
|
||||
use rstest::rstest;
|
||||
@@ -304,3 +308,148 @@ fn connect_compute_total_wait() {
|
||||
assert!(total_wait < tokio::time::Duration::from_secs(12));
|
||||
assert!(total_wait > tokio::time::Duration::from_secs(10));
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum ConnectAction {
|
||||
Connect,
|
||||
Retry,
|
||||
Fail,
|
||||
}
|
||||
|
||||
struct TestConnectMechanism {
|
||||
counter: Arc<std::sync::Mutex<usize>>,
|
||||
sequence: Vec<ConnectAction>,
|
||||
}
|
||||
|
||||
impl TestConnectMechanism {
|
||||
fn new(sequence: Vec<ConnectAction>) -> Self {
|
||||
Self {
|
||||
counter: Arc::new(std::sync::Mutex::new(0)),
|
||||
sequence,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestConnection;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestConnectError {
|
||||
retryable: bool,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TestConnectError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for TestConnectError {}
|
||||
|
||||
impl ShouldRetry for TestConnectError {
|
||||
fn could_retry(&self) -> bool {
|
||||
self.retryable
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ConnectMechanism for TestConnectMechanism {
|
||||
type Connection = TestConnection;
|
||||
type ConnectError = TestConnectError;
|
||||
type Error = anyhow::Error;
|
||||
|
||||
async fn connect_once(
|
||||
&self,
|
||||
_node_info: &console::CachedNodeInfo,
|
||||
_timeout: time::Duration,
|
||||
) -> Result<Self::Connection, Self::ConnectError> {
|
||||
let mut counter = self.counter.lock().unwrap();
|
||||
let action = self.sequence[*counter];
|
||||
*counter += 1;
|
||||
match action {
|
||||
ConnectAction::Connect => Ok(TestConnection),
|
||||
ConnectAction::Retry => Err(TestConnectError { retryable: true }),
|
||||
ConnectAction::Fail => Err(TestConnectError { retryable: false }),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, _conf: &mut compute::ConnCfg) {}
|
||||
}
|
||||
|
||||
fn helper_create_connect_info() -> (
|
||||
CachedNodeInfo,
|
||||
console::ConsoleReqExtra<'static>,
|
||||
auth::BackendType<'static, ClientCredentials<'static>>,
|
||||
) {
|
||||
let node = NodeInfo {
|
||||
config: compute::ConnCfg::new(),
|
||||
aux: Default::default(),
|
||||
allow_self_signed_compute: false,
|
||||
};
|
||||
let cache = CachedNodeInfo::new_uncached(node);
|
||||
let extra = console::ConsoleReqExtra {
|
||||
session_id: uuid::Uuid::new_v4(),
|
||||
application_name: Some("TEST"),
|
||||
};
|
||||
let url = "https://TEST_URL".parse().unwrap();
|
||||
let api = console::provider::mock::Api::new(url);
|
||||
let creds = auth::BackendType::Postgres(Cow::Owned(api), ClientCredentials::new_noop());
|
||||
(cache, extra, creds)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_success() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_retry() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// Test that we don't retry if the error is not retryable.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_1() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Fail]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
/// Even for non-retryable errors, we should retry at least once.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_2() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Fail, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// Retry for at most `NUM_RETRIES_CONNECT` times.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_3() {
|
||||
assert_eq!(NUM_RETRIES_CONNECT, 10);
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![
|
||||
Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
|
||||
/* the 11th time */ Retry,
|
||||
]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
@@ -40,10 +40,13 @@ def parse_metrics(text: str, name: str = "") -> Metrics:
|
||||
return metrics
|
||||
|
||||
|
||||
def histogram(prefix_without_trailing_underscore: str) -> List[str]:
|
||||
assert not prefix_without_trailing_underscore.endswith("_")
|
||||
return [f"{prefix_without_trailing_underscore}_{x}" for x in ["bucket", "count", "sum"]]
|
||||
|
||||
|
||||
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_remote_timeline_client_calls_unfinished",
|
||||
*[f"pageserver_remote_timeline_client_calls_started_{x}" for x in ["bucket", "count", "sum"]],
|
||||
*[f"pageserver_remote_operation_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
"pageserver_remote_physical_size",
|
||||
"pageserver_remote_timeline_client_bytes_started_total",
|
||||
"pageserver_remote_timeline_client_bytes_finished_total",
|
||||
@@ -67,34 +70,29 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_getpage_reconstruct_seconds_count",
|
||||
"pageserver_getpage_reconstruct_seconds_sum",
|
||||
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
*histogram("pageserver_read_num_fs_layers"),
|
||||
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
|
||||
*histogram("pageserver_wait_lsn_seconds"),
|
||||
*histogram("pageserver_remote_operation_seconds"),
|
||||
*histogram("pageserver_remote_timeline_client_calls_started"),
|
||||
*histogram("pageserver_io_operations_seconds"),
|
||||
"pageserver_tenant_states_count",
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_current_logical_size",
|
||||
"pageserver_resident_physical_size",
|
||||
"pageserver_getpage_get_reconstruct_data_seconds_bucket",
|
||||
"pageserver_getpage_get_reconstruct_data_seconds_count",
|
||||
"pageserver_getpage_get_reconstruct_data_seconds_sum",
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"pageserver_io_operations_seconds_bucket",
|
||||
"pageserver_io_operations_seconds_count",
|
||||
"pageserver_io_operations_seconds_sum",
|
||||
"pageserver_last_record_lsn",
|
||||
"pageserver_read_num_fs_layers_bucket",
|
||||
"pageserver_read_num_fs_layers_count",
|
||||
"pageserver_read_num_fs_layers_sum",
|
||||
"pageserver_smgr_query_seconds_bucket",
|
||||
"pageserver_smgr_query_seconds_count",
|
||||
"pageserver_smgr_query_seconds_sum",
|
||||
"pageserver_storage_operations_seconds_count_total",
|
||||
"pageserver_storage_operations_seconds_sum_total",
|
||||
"pageserver_wait_lsn_seconds_bucket",
|
||||
"pageserver_wait_lsn_seconds_count",
|
||||
"pageserver_wait_lsn_seconds_sum",
|
||||
"pageserver_created_persistent_files_total",
|
||||
"pageserver_written_persistent_bytes_total",
|
||||
"pageserver_tenant_states_count",
|
||||
"pageserver_evictions_total",
|
||||
"pageserver_evictions_with_low_residence_duration_total",
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
# pageserver_broken_tenants_count is a leaked "metric" which is "cleared" on restart or reload
|
||||
)
|
||||
|
||||
@@ -136,8 +136,6 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
|
||||
for sample in ps_metrics.query_all(
|
||||
name="pageserver_remote_operation_seconds_count",
|
||||
filter={
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"file_kind": str(file_kind),
|
||||
"op_kind": str(op_kind),
|
||||
},
|
||||
|
||||
@@ -140,8 +140,6 @@ def test_metric_collection(
|
||||
for sample in ps_metrics.query_all(
|
||||
name="pageserver_remote_operation_seconds_count",
|
||||
filter={
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"file_kind": str(file_kind),
|
||||
"op_kind": str(op_kind),
|
||||
},
|
||||
|
||||
@@ -1,92 +0,0 @@
|
||||
import random
|
||||
import threading
|
||||
from threading import Thread
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
#
|
||||
# Test multixact state after branching
|
||||
# Now this test is very minimalistic -
|
||||
# it only checks next_multixact_id field in restored pg_control,
|
||||
# since we don't have functions to check multixact internals.
|
||||
#
|
||||
def test_multixact_conc(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_multixact", "empty")
|
||||
endpoint = env.endpoints.create_start("test_multixact")
|
||||
|
||||
log.info("postgres is running on 'test_multixact' branch")
|
||||
|
||||
n_records = 100
|
||||
n_threads = 5
|
||||
n_iters = 1000
|
||||
n_restarts = 10
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute(
|
||||
f"""
|
||||
CREATE TABLE t1(pk int primary key, val integer);
|
||||
INSERT INTO t1 values (generate_series(1, {n_records}), 0);
|
||||
"""
|
||||
)
|
||||
|
||||
next_multixact_id_old = query_scalar(
|
||||
cur, "SELECT next_multixact_id FROM pg_control_checkpoint()"
|
||||
)
|
||||
|
||||
# Lock entries using parallel connections in a round-robin fashion.
|
||||
def do_updates():
|
||||
conn = endpoint.connect(autocommit=False)
|
||||
for i in range(n_iters):
|
||||
pk = random.randrange(1, n_records)
|
||||
conn.cursor().execute(f"update t1 set val=val+1 where pk={pk}")
|
||||
conn.cursor().execute("select * from t1 for key share")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
for iter in range(n_restarts):
|
||||
threads: List[threading.Thread] = []
|
||||
for i in range(n_threads):
|
||||
threads.append(threading.Thread(target=do_updates, args=(), daemon=False))
|
||||
threads[-1].start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
# Restart endpoint
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("select count(*) from t1")
|
||||
assert cur.fetchone() == (n_records,)
|
||||
|
||||
# force wal flush
|
||||
cur.execute("checkpoint")
|
||||
|
||||
cur.execute(
|
||||
"SELECT next_multixact_id, pg_current_wal_insert_lsn() FROM pg_control_checkpoint()"
|
||||
)
|
||||
res = cur.fetchone()
|
||||
assert res is not None
|
||||
next_multixact_id = res[0]
|
||||
lsn = res[1]
|
||||
|
||||
# Ensure that we did lock some tuples
|
||||
assert int(next_multixact_id) > int(next_multixact_id_old)
|
||||
|
||||
# Branch at this point
|
||||
env.neon_cli.create_branch("test_multixact_new", "test_multixact", ancestor_start_lsn=lsn)
|
||||
endpoint_new = env.endpoints.create_start("test_multixact_new")
|
||||
|
||||
log.info("postgres is running on 'test_multixact_new' branch")
|
||||
next_multixact_id_new = endpoint_new.safe_psql(
|
||||
"SELECT next_multixact_id FROM pg_control_checkpoint()"
|
||||
)[0][0]
|
||||
|
||||
# Check that we restored pg_controlfile correctly
|
||||
assert next_multixact_id_new == next_multixact_id
|
||||
@@ -27,15 +27,16 @@ from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
|
||||
def get_num_downloaded_layers(client: PageserverHttpClient, tenant_id, timeline_id):
|
||||
def get_num_downloaded_layers(client: PageserverHttpClient):
|
||||
"""
|
||||
This assumes that the pageserver only has a single tenant.
|
||||
"""
|
||||
value = client.get_metric_value(
|
||||
"pageserver_remote_operation_seconds_count",
|
||||
{
|
||||
"file_kind": "layer",
|
||||
"op_kind": "download",
|
||||
"status": "success",
|
||||
"tenant_id": tenant_id,
|
||||
"timeline_id": timeline_id,
|
||||
},
|
||||
)
|
||||
if value is None:
|
||||
@@ -57,7 +58,8 @@ def test_ondemand_download_large_rel(
|
||||
test_name="test_ondemand_download_large_rel",
|
||||
)
|
||||
|
||||
##### First start, insert secret data and upload it to the remote storage
|
||||
# thinking about using a shared environment? the test assumes that global
|
||||
# metrics are for single tenant.
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# disable background GC
|
||||
@@ -129,7 +131,7 @@ def test_ondemand_download_large_rel(
|
||||
# safekeepers, that have now been shut down.
|
||||
endpoint = env.endpoints.create_start("main", lsn=current_lsn)
|
||||
|
||||
before_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
|
||||
before_downloads = get_num_downloaded_layers(client)
|
||||
assert before_downloads != 0, "basebackup should on-demand non-zero layers"
|
||||
|
||||
# Probe in the middle of the table. There's a high chance that the beginning
|
||||
@@ -140,7 +142,7 @@ def test_ondemand_download_large_rel(
|
||||
with endpoint.cursor() as cur:
|
||||
assert query_scalar(cur, "select count(*) from tbl where id = 500000") == 1
|
||||
|
||||
after_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
|
||||
after_downloads = get_num_downloaded_layers(client)
|
||||
log.info(f"layers downloaded before {before_downloads} and after {after_downloads}")
|
||||
assert after_downloads > before_downloads
|
||||
|
||||
@@ -159,13 +161,11 @@ def test_ondemand_download_timetravel(
|
||||
test_name="test_ondemand_download_timetravel",
|
||||
)
|
||||
|
||||
##### First start, insert data and upload it to the remote storage
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
# thinking about using a shared environment? the test assumes that global
|
||||
# metrics are for single tenant.
|
||||
|
||||
# Override defaults, to create more layers
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# Disable background GC & compaction
|
||||
# We don't want GC, that would break the assertion about num downloads.
|
||||
# We don't want background compaction, we force a compaction every time we do explicit checkpoint.
|
||||
@@ -178,7 +178,7 @@ def test_ondemand_download_timetravel(
|
||||
"compaction_target_size": f"{1 * 1024 ** 2}", # 1 MB
|
||||
}
|
||||
)
|
||||
env.initial_tenant = tenant
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -283,7 +283,7 @@ def test_ondemand_download_timetravel(
|
||||
== table_len
|
||||
)
|
||||
|
||||
after_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
|
||||
after_downloads = get_num_downloaded_layers(client)
|
||||
num_layers_downloaded.append(after_downloads)
|
||||
log.info(f"num_layers_downloaded[-1]={num_layers_downloaded[-1]}")
|
||||
|
||||
@@ -324,11 +324,8 @@ def test_download_remote_layers_api(
|
||||
)
|
||||
|
||||
##### First start, insert data and upload it to the remote storage
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Override defaults, to create more layers
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# Disable background GC & compaction
|
||||
# We don't want GC, that would break the assertion about num downloads.
|
||||
# We don't want background compaction, we force a compaction every time we do explicit checkpoint.
|
||||
@@ -341,7 +338,6 @@ def test_download_remote_layers_api(
|
||||
"compaction_target_size": f"{1 * 1024 ** 2}", # 1 MB
|
||||
}
|
||||
)
|
||||
env.initial_tenant = tenant
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -489,8 +485,6 @@ def test_compaction_downloads_on_demand_without_image_creation(
|
||||
test_name="test_compaction_downloads_on_demand_without_image_creation",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
conf = {
|
||||
# Disable background GC & compaction
|
||||
"gc_period": "0s",
|
||||
@@ -506,6 +500,8 @@ def test_compaction_downloads_on_demand_without_image_creation(
|
||||
# pitr_interval and gc_horizon are not interesting because we dont run gc
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=stringify(conf))
|
||||
|
||||
def downloaded_bytes_and_count(pageserver_http: PageserverHttpClient) -> Tuple[int, int]:
|
||||
m = pageserver_http.get_metrics()
|
||||
# these are global counters
|
||||
@@ -517,11 +513,12 @@ def test_compaction_downloads_on_demand_without_image_creation(
|
||||
assert count < 2**53 and count.is_integer(), "count should still be safe integer-in-f64"
|
||||
return (int(total_bytes), int(count))
|
||||
|
||||
# Override defaults, to create more layers
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=stringify(conf))
|
||||
env.initial_tenant = tenant_id
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
with env.endpoints.create_start("main") as endpoint:
|
||||
# no particular reason to create the layers like this, but we are sure
|
||||
# not to hit the image_creation_threshold here.
|
||||
@@ -577,8 +574,6 @@ def test_compaction_downloads_on_demand_with_image_creation(
|
||||
test_name="test_compaction_downloads_on_demand",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
conf = {
|
||||
# Disable background GC & compaction
|
||||
"gc_period": "0s",
|
||||
@@ -593,9 +588,11 @@ def test_compaction_downloads_on_demand_with_image_creation(
|
||||
# pitr_interval and gc_horizon are not interesting because we dont run gc
|
||||
}
|
||||
|
||||
# Override defaults, to create more layers
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=stringify(conf))
|
||||
env.initial_tenant = tenant_id
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=stringify(conf))
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
@@ -664,10 +661,6 @@ def test_compaction_downloads_on_demand_with_image_creation(
|
||||
assert dict(kinds_after) == {"Delta": 4, "Image": 1}
|
||||
|
||||
|
||||
def stringify(conf: Dict[str, Any]) -> Dict[str, str]:
|
||||
return dict(map(lambda x: (x[0], str(x[1])), conf.items()))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_ondemand_download_failure_to_replace(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
@@ -691,9 +684,10 @@ def test_ondemand_download_failure_to_replace(
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
env.initial_tenant = tenant_id
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
lsn = Lsn(pageserver_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
|
||||
@@ -724,3 +718,7 @@ def test_ondemand_download_failure_to_replace(
|
||||
env.pageserver.allowed_errors.append(".* ERROR .*Task 'initial size calculation'")
|
||||
|
||||
# if the above returned, then we didn't have a livelock, and all is well
|
||||
|
||||
|
||||
def stringify(conf: Dict[str, Any]) -> Dict[str, str]:
|
||||
return dict(map(lambda x: (x[0], str(x[1])), conf.items()))
|
||||
|
||||
@@ -378,12 +378,10 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
test_name="test_remote_timeline_client_metrics",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# create tenant with config that will determinstically allow
|
||||
# compaction and gc
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# thinking about using a shared environment? the test assumes that global
|
||||
# metrics are for single tenant.
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": f"{128 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
@@ -398,6 +396,10 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
}
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
assert env.initial_timeline is not None
|
||||
timeline_id: TimelineId = env.initial_timeline
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
@@ -419,6 +421,7 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
"VACUUM foo",
|
||||
]
|
||||
)
|
||||
assert timeline_id is not None
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
calls_started: Dict[Tuple[str, str], List[int]] = {
|
||||
@@ -428,13 +431,14 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
}
|
||||
|
||||
def fetch_calls_started():
|
||||
assert timeline_id is not None
|
||||
for (file_kind, op_kind), observations in calls_started.items():
|
||||
val = client.get_remote_timeline_client_metric(
|
||||
"pageserver_remote_timeline_client_calls_started_count",
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
file_kind,
|
||||
op_kind,
|
||||
val = client.get_metric_value(
|
||||
name="pageserver_remote_timeline_client_calls_started_count",
|
||||
filter={
|
||||
"file_kind": str(file_kind),
|
||||
"op_kind": str(op_kind),
|
||||
},
|
||||
)
|
||||
assert val is not None, f"expecting metric to be present: {file_kind} {op_kind}"
|
||||
val = int(val)
|
||||
@@ -518,12 +522,8 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
test_name="test_timeline_deletion_with_files_stuck_in_upload_queue",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# create tenant with config that will determinstically allow
|
||||
# compaction and gc
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# small checkpointing and compaction targets to ensure we generate many operations
|
||||
"checkpoint_distance": f"{64 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
@@ -535,6 +535,10 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
"pitr_interval": "0s",
|
||||
}
|
||||
)
|
||||
tenant_id = env.initial_tenant
|
||||
assert env.initial_timeline is not None
|
||||
timeline_id: TimelineId = env.initial_timeline
|
||||
|
||||
timeline_path = env.timeline_dir(tenant_id, timeline_id)
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
@@ -787,12 +791,8 @@ def test_compaction_delete_before_upload(
|
||||
test_name="test_compaction_delete_before_upload",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# create tenant with config that will determinstically allow
|
||||
# compaction and disables gc
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# Set a small compaction threshold
|
||||
"compaction_threshold": "3",
|
||||
# Disable GC
|
||||
@@ -802,6 +802,10 @@ def test_compaction_delete_before_upload(
|
||||
}
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
assert env.initial_timeline is not None
|
||||
timeline_id: TimelineId = env.initial_timeline
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
|
||||
@@ -2,6 +2,7 @@ import asyncio
|
||||
import random
|
||||
import time
|
||||
from threading import Thread
|
||||
from typing import List, Optional
|
||||
|
||||
import asyncpg
|
||||
import pytest
|
||||
@@ -21,6 +22,7 @@ from fixtures.pageserver.utils import (
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
|
||||
def do_gc_target(
|
||||
@@ -854,3 +856,89 @@ def ensure_test_data(data_id: int, data: str, endpoint: Endpoint):
|
||||
assert (
|
||||
query_scalar(cur, f"SELECT secret FROM test WHERE id = {data_id};") == data
|
||||
), "Should have timeline data back"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_metrics_while_ignoring_broken_tenant_and_reloading(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_metrics_while_ignoring_broken_tenant_and_reloading",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
env.pageserver.allowed_errors.append(
|
||||
r".* Changing Active tenant to Broken state, reason: broken from test"
|
||||
)
|
||||
|
||||
def only_int(samples: List[Sample]) -> Optional[int]:
|
||||
if len(samples) == 1:
|
||||
return int(samples[0].value)
|
||||
assert len(samples) == 0
|
||||
return None
|
||||
|
||||
wait_until_tenant_state(client, env.initial_tenant, "Active", 10, 0.5)
|
||||
|
||||
client.tenant_break(env.initial_tenant)
|
||||
|
||||
found_broken = False
|
||||
active, broken, broken_set = ([], [], [])
|
||||
for _ in range(10):
|
||||
m = client.get_metrics()
|
||||
active = m.query_all("pageserver_tenant_states_count", {"state": "Active"})
|
||||
broken = m.query_all("pageserver_tenant_states_count", {"state": "Broken"})
|
||||
broken_set = m.query_all(
|
||||
"pageserver_broken_tenants_count", {"tenant_id": str(env.initial_tenant)}
|
||||
)
|
||||
found_broken = only_int(active) == 0 and only_int(broken) == 1 and only_int(broken_set) == 1
|
||||
|
||||
if found_broken:
|
||||
break
|
||||
log.info(f"active: {active}, broken: {broken}, broken_set: {broken_set}")
|
||||
time.sleep(0.5)
|
||||
assert (
|
||||
found_broken
|
||||
), f"tenant shows up as broken; active={active}, broken={broken}, broken_set={broken_set}"
|
||||
|
||||
client.tenant_ignore(env.initial_tenant)
|
||||
|
||||
found_broken = False
|
||||
broken, broken_set = ([], [])
|
||||
for _ in range(10):
|
||||
m = client.get_metrics()
|
||||
broken = m.query_all("pageserver_tenant_states_count", {"state": "Broken"})
|
||||
broken_set = m.query_all(
|
||||
"pageserver_broken_tenants_count", {"tenant_id": str(env.initial_tenant)}
|
||||
)
|
||||
found_broken = only_int(broken) == 0 and only_int(broken_set) == 1
|
||||
|
||||
if found_broken:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
assert (
|
||||
found_broken
|
||||
), f"broken should still be in set, but it is not in the tenant state count: broken={broken}, broken_set={broken_set}"
|
||||
|
||||
client.tenant_load(env.initial_tenant)
|
||||
|
||||
found_active = False
|
||||
active, broken_set = ([], [])
|
||||
for _ in range(10):
|
||||
m = client.get_metrics()
|
||||
active = m.query_all("pageserver_tenant_states_count", {"state": "Active"})
|
||||
broken_set = m.query_all(
|
||||
"pageserver_broken_tenants_count", {"tenant_id": str(env.initial_tenant)}
|
||||
)
|
||||
found_active = only_int(active) == 1 and len(broken_set) == 0
|
||||
|
||||
if found_active:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
assert (
|
||||
found_active
|
||||
), f"reloaded tenant should be active, and broken tenant set item removed: active={active}, broken_set={broken_set}"
|
||||
|
||||
@@ -213,6 +213,9 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Test (a subset of) pageserver global metrics
|
||||
for metric in PAGESERVER_GLOBAL_METRICS:
|
||||
if metric.startswith("pageserver_remote"):
|
||||
continue
|
||||
|
||||
ps_samples = ps_metrics.query_all(metric, {})
|
||||
assert len(ps_samples) > 0, f"expected at least one sample for {metric}"
|
||||
for sample in ps_samples:
|
||||
@@ -380,10 +383,8 @@ def test_pageserver_with_empty_tenants(
|
||||
ps_metrics = client.get_metrics()
|
||||
broken_tenants_metric_filter = {
|
||||
"tenant_id": str(tenant_without_timelines_dir),
|
||||
"state": "Broken",
|
||||
}
|
||||
active_tenants_metric_filter = {
|
||||
"tenant_id": str(tenant_with_empty_timelines),
|
||||
"state": "Active",
|
||||
}
|
||||
|
||||
@@ -399,7 +400,7 @@ def test_pageserver_with_empty_tenants(
|
||||
|
||||
tenant_broken_count = int(
|
||||
ps_metrics.query_one(
|
||||
"pageserver_tenant_states_count", filter=broken_tenants_metric_filter
|
||||
"pageserver_broken_tenants_count", filter=broken_tenants_metric_filter
|
||||
).value
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user