mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 21:50:37 +00:00
Compare commits
5 Commits
bump-nextX
...
vlad/asfsg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27fe7f8963 | ||
|
|
0a937b7f91 | ||
|
|
b8d031cd0c | ||
|
|
f0d29a0f3e | ||
|
|
13522fb722 |
@@ -9,6 +9,7 @@ use std::{
|
||||
collections::HashMap,
|
||||
io::{BufRead, Read},
|
||||
num::{NonZeroU64, NonZeroUsize},
|
||||
str::FromStr,
|
||||
sync::atomic::AtomicUsize,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
@@ -437,18 +438,7 @@ pub enum CompactionAlgorithm {
|
||||
Tiered,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
strum_macros::FromRepr,
|
||||
strum_macros::EnumString,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum ImageCompressionAlgorithm {
|
||||
/// Disabled for writes, and never decompress during reading.
|
||||
/// Never set this after you've enabled compression once!
|
||||
@@ -468,6 +458,31 @@ impl ImageCompressionAlgorithm {
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for ImageCompressionAlgorithm {
|
||||
type Err = anyhow::Error;
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let mut components = s.split(['(', ')']);
|
||||
let first = components
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("empty string"))?;
|
||||
match first {
|
||||
"disabled-no-decompress" => Ok(ImageCompressionAlgorithm::DisabledNoDecompress),
|
||||
"disabled" => Ok(ImageCompressionAlgorithm::Disabled),
|
||||
"zstd" => {
|
||||
let level = if let Some(v) = components.next() {
|
||||
let v: i8 = v.parse()?;
|
||||
Some(v)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(ImageCompressionAlgorithm::Zstd { level })
|
||||
}
|
||||
_ => anyhow::bail!("invalid specifier '{first}'"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CompactionAlgorithmSettings {
|
||||
pub kind: CompactionAlgorithm,
|
||||
@@ -1660,4 +1675,29 @@ mod tests {
|
||||
AuxFilePolicy::CrossValidation
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_image_compression_algorithm_parsing() {
|
||||
use ImageCompressionAlgorithm::*;
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("disabled").unwrap(),
|
||||
Disabled
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("disabled-no-decompress").unwrap(),
|
||||
DisabledNoDecompress
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd").unwrap(),
|
||||
Zstd { level: None }
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd(18)").unwrap(),
|
||||
Zstd { level: Some(18) }
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd(-3)").unwrap(),
|
||||
Zstd { level: Some(-3) }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
|
||||
/// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
|
||||
/// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
|
||||
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
|
||||
const XID_CHECKPOINT_INTERVAL: u32 = 128;
|
||||
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
|
||||
|
||||
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
|
||||
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
|
||||
|
||||
@@ -187,19 +187,19 @@ pub fn test_update_next_xid() {
|
||||
// The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL
|
||||
// boundary
|
||||
checkpoint.update_next_xid(100);
|
||||
assert_eq!(checkpoint.nextXid.value, 128);
|
||||
assert_eq!(checkpoint.nextXid.value, 1024);
|
||||
|
||||
// No change
|
||||
checkpoint.update_next_xid(100);
|
||||
assert_eq!(checkpoint.nextXid.value, 128);
|
||||
checkpoint.update_next_xid(127);
|
||||
assert_eq!(checkpoint.nextXid.value, 128);
|
||||
checkpoint.update_next_xid(500);
|
||||
assert_eq!(checkpoint.nextXid.value, 1024);
|
||||
checkpoint.update_next_xid(1023);
|
||||
assert_eq!(checkpoint.nextXid.value, 1024);
|
||||
|
||||
// The function returns the *next* XID, given the highest XID seen so
|
||||
// far. So when we pass 128, the nextXid gets bumped up to the next
|
||||
// far. So when we pass 1024, the nextXid gets bumped up to the next
|
||||
// XID_CHECKPOINT_INTERVAL boundary.
|
||||
checkpoint.update_next_xid(128);
|
||||
assert_eq!(checkpoint.nextXid.value, 256);
|
||||
checkpoint.update_next_xid(1024);
|
||||
assert_eq!(checkpoint.nextXid.value, 2048);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1456,10 +1456,12 @@ impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_live_connections",
|
||||
"Number of live network connections",
|
||||
pub(crate) static LIVE_CONNECTIONS: Lazy<IntCounterPairVec> = Lazy::new(|| {
|
||||
register_int_counter_pair_vec!(
|
||||
"pageserver_live_connections_started",
|
||||
"Number of network connections that we started handling",
|
||||
"pageserver_live_connections_finished",
|
||||
"Number of network connections that we finished handling",
|
||||
&["pageserver_connection_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
|
||||
@@ -55,7 +55,7 @@ use crate::basebackup::BasebackupError;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::import_datadir::import_wal_from_tar;
|
||||
use crate::metrics;
|
||||
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS_COUNT};
|
||||
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
@@ -215,14 +215,9 @@ async fn page_service_conn_main(
|
||||
auth_type: AuthType,
|
||||
connection_ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Immediately increment the gauge, then create a job to decrement it on task exit.
|
||||
// One of the pros of `defer!` is that this will *most probably*
|
||||
// get called, even in presence of panics.
|
||||
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
|
||||
gauge.inc();
|
||||
scopeguard::defer! {
|
||||
gauge.dec();
|
||||
}
|
||||
let _guard = LIVE_CONNECTIONS
|
||||
.with_label_values(&["page_service"])
|
||||
.guard();
|
||||
|
||||
socket
|
||||
.set_nodelay(true)
|
||||
|
||||
@@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument};
|
||||
use super::TaskStateUpdate;
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
|
||||
metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::WALRECEIVER_RUNTIME,
|
||||
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
|
||||
@@ -208,14 +208,9 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
.instrument(tracing::info_span!("poller")),
|
||||
);
|
||||
|
||||
// Immediately increment the gauge, then create a job to decrement it on task exit.
|
||||
// One of the pros of `defer!` is that this will *most probably*
|
||||
// get called, even in presence of panics.
|
||||
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
|
||||
gauge.inc();
|
||||
scopeguard::defer! {
|
||||
gauge.dec();
|
||||
}
|
||||
let _guard = LIVE_CONNECTIONS
|
||||
.with_label_values(&["wal_receiver"])
|
||||
.guard();
|
||||
|
||||
let identify = identify_system(&replication_client).await?;
|
||||
info!("{identify:?}");
|
||||
|
||||
8
poetry.lock
generated
8
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohttp"
|
||||
@@ -734,13 +734,13 @@ typing-extensions = ">=4.1.0"
|
||||
|
||||
[[package]]
|
||||
name = "certifi"
|
||||
version = "2023.7.22"
|
||||
version = "2024.7.4"
|
||||
description = "Python package for providing Mozilla's CA Bundle."
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
{file = "certifi-2023.7.22-py3-none-any.whl", hash = "sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9"},
|
||||
{file = "certifi-2023.7.22.tar.gz", hash = "sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082"},
|
||||
{file = "certifi-2024.7.4-py3-none-any.whl", hash = "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"},
|
||||
{file = "certifi-2024.7.4.tar.gz", hash = "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use futures::StreamExt;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -29,7 +29,7 @@ impl LargeObjectKind {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct LargeObject {
|
||||
pub key: String,
|
||||
pub size: u64,
|
||||
@@ -45,53 +45,76 @@ pub async fn find_large_objects(
|
||||
bucket_config: BucketConfig,
|
||||
min_size: u64,
|
||||
ignore_deltas: bool,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<LargeObjectListing> {
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
|
||||
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
|
||||
|
||||
let objects_stream = tenants.map_ok(|tenant_shard_id| {
|
||||
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
||||
let s3_client = s3_client.clone();
|
||||
async move {
|
||||
let mut objects = Vec::new();
|
||||
let mut total_objects_ctr = 0u64;
|
||||
// We want the objects and not just common prefixes
|
||||
tenant_root.delimiter.clear();
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let fetch_response =
|
||||
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
|
||||
.await?;
|
||||
for obj in fetch_response.contents().iter().filter(|o| {
|
||||
if let Some(obj_size) = o.size {
|
||||
min_size as i64 <= obj_size
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}) {
|
||||
let key = obj.key().expect("couldn't get key").to_owned();
|
||||
let kind = LargeObjectKind::from_key(&key);
|
||||
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
|
||||
continue;
|
||||
}
|
||||
objects.push(LargeObject {
|
||||
key,
|
||||
size: obj.size.unwrap() as u64,
|
||||
kind,
|
||||
})
|
||||
}
|
||||
total_objects_ctr += fetch_response.contents().len() as u64;
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok((tenant_shard_id, objects, total_objects_ctr))
|
||||
}
|
||||
});
|
||||
let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
|
||||
|
||||
let mut objects = Vec::new();
|
||||
|
||||
let mut tenant_ctr = 0u64;
|
||||
let mut object_ctr = 0u64;
|
||||
while let Some(tenant_shard_id) = tenants.next().await {
|
||||
let tenant_shard_id = tenant_shard_id?;
|
||||
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
||||
// We want the objects and not just common prefixes
|
||||
tenant_root.delimiter.clear();
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let fetch_response =
|
||||
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
|
||||
.await?;
|
||||
for obj in fetch_response.contents().iter().filter(|o| {
|
||||
if let Some(obj_size) = o.size {
|
||||
min_size as i64 <= obj_size
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}) {
|
||||
let key = obj.key().expect("couldn't get key").to_owned();
|
||||
let kind = LargeObjectKind::from_key(&key);
|
||||
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
|
||||
continue;
|
||||
}
|
||||
objects.push(LargeObject {
|
||||
key,
|
||||
size: obj.size.unwrap() as u64,
|
||||
kind,
|
||||
})
|
||||
}
|
||||
object_ctr += fetch_response.contents().len() as u64;
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
while let Some(res) = objects_stream.next().await {
|
||||
let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
|
||||
objects.extend_from_slice(&objects_slice);
|
||||
|
||||
object_ctr += total_objects_ctr;
|
||||
tenant_ctr += 1;
|
||||
if tenant_ctr % 50 == 0 {
|
||||
if tenant_ctr % 100 == 0 {
|
||||
tracing::info!(
|
||||
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len()
|
||||
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
|
||||
objects.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let bucket_name = target.bucket_name();
|
||||
tracing::info!(
|
||||
"Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
|
||||
objects.len()
|
||||
);
|
||||
Ok(LargeObjectListing { objects })
|
||||
}
|
||||
|
||||
@@ -78,6 +78,8 @@ enum Command {
|
||||
min_size: u64,
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
ignore_deltas: bool,
|
||||
#[arg(long = "concurrency", short = 'j', default_value_t = 64)]
|
||||
concurrency: usize,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -210,10 +212,15 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::FindLargeObjects {
|
||||
min_size,
|
||||
ignore_deltas,
|
||||
concurrency,
|
||||
} => {
|
||||
let summary =
|
||||
find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas)
|
||||
.await?;
|
||||
let summary = find_large_objects::find_large_objects(
|
||||
bucket_config,
|
||||
min_size,
|
||||
ignore_deltas,
|
||||
concurrency,
|
||||
)
|
||||
.await?;
|
||||
println!("{}", serde_json::to_string(&summary).unwrap());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -54,4 +54,4 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
|
||||
pcur.execute(f"INSERT into t values ({n_records}, 0)")
|
||||
n_records += 1
|
||||
with sub.cursor() as scur:
|
||||
wait_until(10, 0.5, check_that_changes_propagated)
|
||||
wait_until(60, 0.5, check_that_changes_propagated)
|
||||
|
||||
Reference in New Issue
Block a user