mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 10:00:38 +00:00
Compare commits
10 Commits
sk-collect
...
lr-cap-wal
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8fad41fbe | ||
|
|
613906acea | ||
|
|
82809d2ec2 | ||
|
|
0bd79eb063 | ||
|
|
8ff5387da1 | ||
|
|
8b91bbc38e | ||
|
|
e6bf6952b8 | ||
|
|
a2fab34371 | ||
|
|
c52384752e | ||
|
|
73d247c464 |
29
.github/workflows/benchmarking.yml
vendored
29
.github/workflows/benchmarking.yml
vendored
@@ -11,7 +11,7 @@ on:
|
||||
# │ │ ┌───────────── day of the month (1 - 31)
|
||||
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
|
||||
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
|
||||
- cron: '0 3 * * *' # run once a day, timezone is utc
|
||||
- cron: '0 3 * * *' # run once a day, timezone is utc
|
||||
|
||||
workflow_dispatch: # adds ability to run this manually
|
||||
inputs:
|
||||
@@ -23,6 +23,21 @@ on:
|
||||
type: boolean
|
||||
description: 'Publish perf report. If not set, the report will be published only for the main branch'
|
||||
required: false
|
||||
collect_olap_explain:
|
||||
type: boolean
|
||||
description: 'Collect EXPLAIN ANALYZE for OLAP queries. If not set, EXPLAIN ANALYZE will not be collected'
|
||||
required: false
|
||||
default: false
|
||||
collect_pg_stat_statements:
|
||||
type: boolean
|
||||
description: 'Collect pg_stat_statements for OLAP queries. If not set, pg_stat_statements will not be collected'
|
||||
required: false
|
||||
default: false
|
||||
run_AWS_RDS_AND_AURORA:
|
||||
type: boolean
|
||||
description: 'AWS-RDS and AWS-AURORA normally only run on Saturday. Set this to true to run them on every workflow_dispatch'
|
||||
required: false
|
||||
default: false
|
||||
|
||||
defaults:
|
||||
run:
|
||||
@@ -113,6 +128,8 @@ jobs:
|
||||
# - neon-captest-reuse: Reusing existing project
|
||||
# - rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
|
||||
# - rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
|
||||
env:
|
||||
RUN_AWS_RDS_AND_AURORA: ${{ github.event.inputs.run_AWS_RDS_AND_AURORA || 'false' }}
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
pgbench-compare-matrix: ${{ steps.pgbench-compare-matrix.outputs.matrix }}
|
||||
@@ -152,7 +169,7 @@ jobs:
|
||||
]
|
||||
}'
|
||||
|
||||
if [ "$(date +%A)" = "Saturday" ]; then
|
||||
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
|
||||
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres" },
|
||||
{ "platform": "rds-aurora" }]')
|
||||
fi
|
||||
@@ -171,9 +188,9 @@ jobs:
|
||||
]
|
||||
}'
|
||||
|
||||
if [ "$(date +%A)" = "Saturday" ]; then
|
||||
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
|
||||
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres", "scale": "10" },
|
||||
{ "platform": "rds-aurora", "scale": "10" }]')
|
||||
{ "platform": "rds-aurora", "scale": "10" }]')
|
||||
fi
|
||||
|
||||
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
@@ -337,6 +354,8 @@ jobs:
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
TEST_OLAP_COLLECT_EXPLAIN: ${{ github.event.inputs.collect_olap_explain }}
|
||||
TEST_OLAP_COLLECT_PG_STAT_STATEMENTS: ${{ github.event.inputs.collect_pg_stat_statements }}
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: ${{ matrix.platform }}
|
||||
@@ -399,6 +418,8 @@ jobs:
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_OLAP_COLLECT_EXPLAIN: ${{ github.event.inputs.collect_olap_explain || 'false' }}
|
||||
TEST_OLAP_COLLECT_PG_STAT_STATEMENTS: ${{ github.event.inputs.collect_pg_stat_statements || 'false' }}
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
TEST_OLAP_SCALE: 10
|
||||
|
||||
|
||||
46
Cargo.lock
generated
46
Cargo.lock
generated
@@ -190,9 +190,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.0"
|
||||
version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b0122885821398cc923ece939e24d1056a2384ee719432397fa9db87230ff11"
|
||||
checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5"
|
||||
dependencies = [
|
||||
"flate2",
|
||||
"futures-core",
|
||||
@@ -2487,13 +2487,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "jsonwebtoken"
|
||||
version = "8.3.0"
|
||||
version = "9.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378"
|
||||
checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"pem 1.1.1",
|
||||
"ring 0.16.20",
|
||||
"js-sys",
|
||||
"pem 3.0.3",
|
||||
"ring 0.17.6",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"simple_asn1",
|
||||
@@ -3291,18 +3292,19 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
|
||||
|
||||
[[package]]
|
||||
name = "pem"
|
||||
version = "1.1.1"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8835c273a76a90455d7344889b0964598e3316e2a79ede8e36f16bdcf2228b8"
|
||||
checksum = "6b13fe415cdf3c8e44518e18a7c95a13431d9bdf6d15367d82b23c377fdd441a"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"base64 0.21.1",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pem"
|
||||
version = "2.0.1"
|
||||
version = "3.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6b13fe415cdf3c8e44518e18a7c95a13431d9bdf6d15367d82b23c377fdd441a"
|
||||
checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"serde",
|
||||
@@ -4428,12 +4430,12 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "sct"
|
||||
version = "0.7.0"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
|
||||
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
|
||||
dependencies = [
|
||||
"ring 0.16.20",
|
||||
"untrusted 0.7.1",
|
||||
"ring 0.17.6",
|
||||
"untrusted 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6412,30 +6414,28 @@ checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.12.4"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c"
|
||||
checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110"
|
||||
dependencies = [
|
||||
"zstd-safe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-safe"
|
||||
version = "6.0.6"
|
||||
version = "7.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581"
|
||||
checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-sys"
|
||||
version = "2.0.8+zstd.1.5.5"
|
||||
version = "2.0.9+zstd.1.5.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c"
|
||||
checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
]
|
||||
|
||||
@@ -91,7 +91,7 @@ hyper-tungstenite = "0.11"
|
||||
inotify = "0.10.2"
|
||||
ipnet = "2.9.0"
|
||||
itertools = "0.10"
|
||||
jsonwebtoken = "8"
|
||||
jsonwebtoken = "9"
|
||||
libc = "0.2"
|
||||
md5 = "0.7.0"
|
||||
memoffset = "0.8"
|
||||
|
||||
@@ -569,6 +569,23 @@ RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/roaringbitmap.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-semver-pg-build"
|
||||
# compile pg_semver extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-semver-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/theory/pg-semver/archive/refs/tags/v0.32.1.tar.gz -O pg_semver.tar.gz && \
|
||||
echo "fbdaf7512026d62eec03fad8687c15ed509b6ba395bff140acd63d2e4fbe25d7 pg_semver.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_semver-src && cd pg_semver-src && tar xvzf ../pg_semver.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/semver.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-embedding-pg-build"
|
||||
@@ -768,6 +785,7 @@ COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-semver-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=wal2json-pg-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
@@ -37,5 +37,5 @@ workspace_hack.workspace = true
|
||||
toml_edit.workspace = true
|
||||
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||
vm_monitor = { version = "0.1", path = "../libs/vm_monitor/" }
|
||||
zstd = "0.12.4"
|
||||
zstd = "0.13"
|
||||
bytes = "1.0"
|
||||
|
||||
@@ -370,33 +370,49 @@ pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Cli
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reassign_owned_objects_in_one_db(
|
||||
conf: Config,
|
||||
role_name: &PgIdent,
|
||||
db_owner: &PgIdent,
|
||||
) -> Result<()> {
|
||||
let mut client = conf.connect(NoTls)?;
|
||||
|
||||
// This will reassign all dependent objects to the db owner
|
||||
let reassign_query = format!(
|
||||
"REASSIGN OWNED BY {} TO {}",
|
||||
role_name.pg_quote(),
|
||||
db_owner.pg_quote()
|
||||
);
|
||||
info!(
|
||||
"reassigning objects owned by '{}' in db '{}' to '{}'",
|
||||
role_name,
|
||||
conf.get_dbname().unwrap_or(""),
|
||||
db_owner
|
||||
);
|
||||
client.simple_query(&reassign_query)?;
|
||||
|
||||
// This now will only drop privileges of the role
|
||||
let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
|
||||
client.simple_query(&drop_query)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Reassign all owned objects in all databases to the owner of the database.
|
||||
fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
|
||||
for db in &spec.cluster.databases {
|
||||
if db.owner != *role_name {
|
||||
let mut conf = Config::from_str(connstr)?;
|
||||
conf.dbname(&db.name);
|
||||
|
||||
let mut client = conf.connect(NoTls)?;
|
||||
|
||||
// This will reassign all dependent objects to the db owner
|
||||
let reassign_query = format!(
|
||||
"REASSIGN OWNED BY {} TO {}",
|
||||
role_name.pg_quote(),
|
||||
db.owner.pg_quote()
|
||||
);
|
||||
info!(
|
||||
"reassigning objects owned by '{}' in db '{}' to '{}'",
|
||||
role_name, &db.name, &db.owner
|
||||
);
|
||||
client.simple_query(&reassign_query)?;
|
||||
|
||||
// This now will only drop privileges of the role
|
||||
let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
|
||||
client.simple_query(&drop_query)?;
|
||||
reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Also handle case when there are no databases in the spec.
|
||||
// In this case we need to reassign objects in the default database.
|
||||
let conf = Config::from_str(connstr)?;
|
||||
let db_owner = PgIdent::from_str("cloud_admin")?;
|
||||
reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -522,14 +522,18 @@ pub(crate) mod initial_logical_size {
|
||||
impl StartCalculation {
|
||||
pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
|
||||
let circumstances_label: &'static str = circumstances.into();
|
||||
self.0.with_label_values(&["first", circumstances_label]);
|
||||
self.0
|
||||
.with_label_values(&["first", circumstances_label])
|
||||
.inc();
|
||||
OngoingCalculationGuard {
|
||||
inc_drop_calculation: Some(DROP_CALCULATION.first.clone()),
|
||||
}
|
||||
}
|
||||
pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
|
||||
let circumstances_label: &'static str = circumstances.into();
|
||||
self.0.with_label_values(&["retry", circumstances_label]);
|
||||
self.0
|
||||
.with_label_values(&["retry", circumstances_label])
|
||||
.inc();
|
||||
OngoingCalculationGuard {
|
||||
inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()),
|
||||
}
|
||||
@@ -1019,12 +1023,62 @@ static SMGR_QUERY_TIME_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static SMGR_QUERY_TIME_GLOBAL_BUCKETS: Lazy<Vec<f64>> = Lazy::new(|| {
|
||||
[
|
||||
1,
|
||||
10,
|
||||
20,
|
||||
40,
|
||||
60,
|
||||
80,
|
||||
100,
|
||||
200,
|
||||
300,
|
||||
400,
|
||||
500,
|
||||
600,
|
||||
700,
|
||||
800,
|
||||
900,
|
||||
1_000, // 1ms
|
||||
2_000,
|
||||
4_000,
|
||||
6_000,
|
||||
8_000,
|
||||
10_000, // 10ms
|
||||
20_000,
|
||||
40_000,
|
||||
60_000,
|
||||
80_000,
|
||||
100_000,
|
||||
200_000,
|
||||
400_000,
|
||||
600_000,
|
||||
800_000,
|
||||
1_000_000, // 1s
|
||||
2_000_000,
|
||||
4_000_000,
|
||||
6_000_000,
|
||||
8_000_000,
|
||||
10_000_000, // 10s
|
||||
20_000_000,
|
||||
50_000_000,
|
||||
100_000_000,
|
||||
200_000_000,
|
||||
1_000_000_000, // 1000s
|
||||
]
|
||||
.into_iter()
|
||||
.map(Duration::from_micros)
|
||||
.map(|d| d.as_secs_f64())
|
||||
.collect()
|
||||
});
|
||||
|
||||
static SMGR_QUERY_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_smgr_query_seconds_global",
|
||||
"Time spent on smgr query handling, aggregated by query type.",
|
||||
&["smgr_query_type"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
SMGR_QUERY_TIME_GLOBAL_BUCKETS.clone(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
@@ -1712,11 +1712,29 @@ walprop_pg_after_election(WalProposer *wp)
|
||||
f = fopen("restart.lsn", "rb");
|
||||
if (f != NULL && !wp->config->syncSafekeepers)
|
||||
{
|
||||
fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
|
||||
size_t rc = fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
|
||||
fclose(f);
|
||||
if (lrRestartLsn != InvalidXLogRecPtr)
|
||||
if (rc == 1 && lrRestartLsn != InvalidXLogRecPtr)
|
||||
{
|
||||
elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
|
||||
uint64 download_range_mb;
|
||||
|
||||
elog(LOG, "Logical replication restart LSN %X/%X, epochStartLsn %X/%X, max_slot_wal_keep_size_mb=%d",
|
||||
LSN_FORMAT_ARGS(lrRestartLsn), LSN_FORMAT_ARGS(wp->propEpochStartLsn), max_slot_wal_keep_size_mb);
|
||||
|
||||
/*
|
||||
* If we need to download more than a max_slot_wal_keep_size, cap to it to
|
||||
* avoid risk of exploding pg_wal. Logical replication won't work until
|
||||
* recreated, but at least compute would start; this also follows
|
||||
* max_slot_wal_keep_size semantics.
|
||||
*/
|
||||
download_range_mb = (wp->propEpochStartLsn - lrRestartLsn) / 1024 / 1024;
|
||||
if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb)
|
||||
{
|
||||
lrRestartLsn = wp->propEpochStartLsn - max_slot_wal_keep_size_mb * 1024 * 1024;
|
||||
elog(WARNING, "capped WAL download for logical replication to %X/%X as max_slot_wal_keep_size=%dMB",
|
||||
LSN_FORMAT_ARGS(lrRestartLsn), max_slot_wal_keep_size_mb);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* start from the beginning of the segment to fetch page headers
|
||||
|
||||
@@ -1,15 +1,25 @@
|
||||
# Collect /v1/debug_dump from all safekeeper nodes
|
||||
|
||||
3. Issue admin token (add/remove .stage from url for staging/prod and setting proper API key):
|
||||
```
|
||||
AUTH_TOKEN=$(curl https://console.stage.neon.tech/regions/console/api/v1/admin/issue_token -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer $NEON_STAGING_KEY" -X POST -d '{"ttl_seconds": 43200, "scope": "safekeeperdata"}' 2>/dev/null | jq --raw-output '.jwt')
|
||||
# check
|
||||
echo $AUTH_TOKEN
|
||||
```
|
||||
2. Run ansible playbooks to collect .json dumps from all safekeepers and store them in `./result` directory.
|
||||
```
|
||||
# in aws repo, cd .github/ansible and run e.g. (ajusting profile and region in vars and limit):
|
||||
AWS_DEFAULT_PROFILE=dev ansible-playbook -i inventory_aws_ec2.yaml -i staging.us-east-2.vars.yaml -e @ssm_config -l 'safekeeper:&us_east_2' -e "auth_token=${AUTH_TOKEN}" --check ~/neon/neon/scripts/sk_collect_dumps/remote.yaml
|
||||
```
|
||||
It will put the results to .results directory *near the playbook*.
|
||||
1. Run ansible playbooks to collect .json dumps from all safekeepers and store them in `./result` directory.
|
||||
2. Run `DB_CONNSTR=... ./upload.sh prod_feb30` to upload dumps to `prod_feb30` table in specified postgres database.
|
||||
|
||||
## How to use ansible (staging)
|
||||
|
||||
```
|
||||
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
|
||||
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.eu-west-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
```
|
||||
|
||||
## How to use ansible (prod)
|
||||
|
||||
```
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-west-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.eu-central-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.ap-southeast-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
```
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
- name: Fetch state dumps from safekeepers
|
||||
hosts: safekeeper
|
||||
hosts: safekeepers
|
||||
gather_facts: False
|
||||
remote_user: "{{ remote_user }}"
|
||||
|
||||
@@ -8,8 +8,6 @@
|
||||
get_url:
|
||||
url: "http://{{ inventory_hostname }}:7676/v1/debug_dump?dump_all=true&dump_disk_content=false"
|
||||
dest: "/tmp/{{ inventory_hostname }}.json"
|
||||
headers:
|
||||
Authorization: "Bearer {{ auth_token }}"
|
||||
|
||||
- name: Fetch file from remote hosts
|
||||
fetch:
|
||||
|
||||
@@ -31,22 +31,22 @@ SELECT
|
||||
(data->>'tenant_id') AS tenant_id,
|
||||
(data->>'timeline_id') AS timeline_id,
|
||||
(data->'memory'->>'active')::bool AS active,
|
||||
(data->'memory'->>'flush_lsn')::pg_lsn AS flush_lsn,
|
||||
(data->'memory'->'mem_state'->>'backup_lsn')::pg_lsn AS backup_lsn,
|
||||
(data->'memory'->'mem_state'->>'commit_lsn')::pg_lsn AS commit_lsn,
|
||||
(data->'memory'->'mem_state'->>'peer_horizon_lsn')::pg_lsn AS peer_horizon_lsn,
|
||||
(data->'memory'->'mem_state'->>'remote_consistent_lsn')::pg_lsn AS remote_consistent_lsn,
|
||||
(data->'memory'->>'write_lsn')::pg_lsn AS write_lsn,
|
||||
(data->'memory'->>'flush_lsn')::bigint AS flush_lsn,
|
||||
(data->'memory'->'mem_state'->>'backup_lsn')::bigint AS backup_lsn,
|
||||
(data->'memory'->'mem_state'->>'commit_lsn')::bigint AS commit_lsn,
|
||||
(data->'memory'->'mem_state'->>'peer_horizon_lsn')::bigint AS peer_horizon_lsn,
|
||||
(data->'memory'->'mem_state'->>'remote_consistent_lsn')::bigint AS remote_consistent_lsn,
|
||||
(data->'memory'->>'write_lsn')::bigint AS write_lsn,
|
||||
(data->'memory'->>'num_computes')::bigint AS num_computes,
|
||||
(data->'memory'->>'epoch_start_lsn')::pg_lsn AS epoch_start_lsn,
|
||||
(data->'memory'->>'epoch_start_lsn')::bigint AS epoch_start_lsn,
|
||||
(data->'memory'->>'last_removed_segno')::bigint AS last_removed_segno,
|
||||
(data->'memory'->>'is_cancelled')::bool AS is_cancelled,
|
||||
(data->'control_file'->>'backup_lsn')::pg_lsn AS disk_backup_lsn,
|
||||
(data->'control_file'->>'commit_lsn')::pg_lsn AS disk_commit_lsn,
|
||||
(data->'control_file'->>'backup_lsn')::bigint AS disk_backup_lsn,
|
||||
(data->'control_file'->>'commit_lsn')::bigint AS disk_commit_lsn,
|
||||
(data->'control_file'->'acceptor_state'->>'term')::bigint AS disk_term,
|
||||
(data->'control_file'->>'local_start_lsn')::pg_lsn AS local_start_lsn,
|
||||
(data->'control_file'->>'peer_horizon_lsn')::pg_lsn AS disk_peer_horizon_lsn,
|
||||
(data->'control_file'->>'timeline_start_lsn')::pg_lsn AS timeline_start_lsn,
|
||||
(data->'control_file'->>'remote_consistent_lsn')::pg_lsn AS disk_remote_consistent_lsn
|
||||
(data->'control_file'->>'local_start_lsn')::bigint AS local_start_lsn,
|
||||
(data->'control_file'->>'peer_horizon_lsn')::bigint AS disk_peer_horizon_lsn,
|
||||
(data->'control_file'->>'timeline_start_lsn')::bigint AS timeline_start_lsn,
|
||||
(data->'control_file'->>'remote_consistent_lsn')::bigint AS disk_remote_consistent_lsn
|
||||
FROM tmp_json
|
||||
EOF
|
||||
|
||||
@@ -3,9 +3,12 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use clap::Parser;
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
|
||||
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::{
|
||||
FilterTenantTimelineId, MessageType, SubscribeByFilterRequest,
|
||||
TenantTimelineId as ProtoTenantTimelineId, TypeSubscription, TypedMessage,
|
||||
};
|
||||
|
||||
use storage_broker::{BrokerClientChannel, DEFAULT_ENDPOINT};
|
||||
use tokio::time;
|
||||
@@ -91,15 +94,23 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
};
|
||||
|
||||
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
|
||||
let ttid = ProtoTenantTimelineId {
|
||||
tenant_id: vec![0xFF; 16],
|
||||
timeline_id: tli_from_u64(i),
|
||||
});
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
subscription_key: Some(key),
|
||||
};
|
||||
let mut stream = client
|
||||
.subscribe_safekeeper_info(request)
|
||||
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![TypeSubscription {
|
||||
r#type: MessageType::SafekeeperTimelineInfo.into(),
|
||||
}],
|
||||
tenant_timeline_id: Some(FilterTenantTimelineId {
|
||||
enabled: true,
|
||||
tenant_timeline_id: Some(ttid),
|
||||
}),
|
||||
};
|
||||
|
||||
let mut stream: tonic::Streaming<TypedMessage> = client
|
||||
.subscribe_by_filter(request)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
|
||||
@@ -10,6 +10,12 @@ service BrokerService {
|
||||
|
||||
// Publish safekeeper updates.
|
||||
rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (google.protobuf.Empty) {};
|
||||
|
||||
// Subscribe to all messages, limited by a filter.
|
||||
rpc SubscribeByFilter(SubscribeByFilterRequest) returns (stream TypedMessage) {};
|
||||
|
||||
// Publish one message.
|
||||
rpc PublishOne(TypedMessage) returns (google.protobuf.Empty) {};
|
||||
}
|
||||
|
||||
message SubscribeSafekeeperInfoRequest {
|
||||
@@ -48,3 +54,55 @@ message TenantTimelineId {
|
||||
bytes tenant_id = 1;
|
||||
bytes timeline_id = 2;
|
||||
}
|
||||
|
||||
message FilterTenantTimelineId {
|
||||
// If true, only messages related to `tenant_timeline_id` will be emitted.
|
||||
// Otherwise, messages for all timelines will be emitted.
|
||||
bool enabled = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
}
|
||||
|
||||
message TypeSubscription {
|
||||
MessageType type = 1;
|
||||
}
|
||||
|
||||
message SubscribeByFilterRequest {
|
||||
// Subscription will emit messages only of the specified types. You need to specify
|
||||
// at least one type to receive any messages.
|
||||
repeated TypeSubscription types = 1;
|
||||
|
||||
// If set and enabled, subscription will emit messages only for the specified tenant/timeline.
|
||||
optional FilterTenantTimelineId tenant_timeline_id = 2;
|
||||
}
|
||||
|
||||
enum MessageType {
|
||||
UNKNOWN = 0;
|
||||
SAFEKEEPER_TIMELINE_INFO = 2;
|
||||
SAFEKEEPER_DISCOVERY_REQUEST = 3;
|
||||
SAFEKEEPER_DISCOVERY_RESPONSE = 4;
|
||||
}
|
||||
|
||||
// A message with a type.
|
||||
message TypedMessage {
|
||||
MessageType type = 1;
|
||||
|
||||
optional SafekeeperTimelineInfo safekeeper_timeline_info = 2;
|
||||
optional SafekeeperDiscoveryRequest safekeeper_discovery_request = 3;
|
||||
optional SafekeeperDiscoveryResponse safekeeper_discovery_response = 4;
|
||||
}
|
||||
|
||||
message SafekeeperDiscoveryRequest {
|
||||
TenantTimelineId tenant_timeline_id = 1;
|
||||
}
|
||||
|
||||
// Shorter version of SafekeeperTimelineInfo, contains only necessary fields.
|
||||
message SafekeeperDiscoveryResponse {
|
||||
uint64 safekeeper_id = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
// WAL available to download.
|
||||
uint64 commit_lsn = 3;
|
||||
// A connection string to use for WAL downloading.
|
||||
string safekeeper_connstr = 4;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 5;
|
||||
}
|
||||
|
||||
@@ -35,10 +35,16 @@ use tracing::*;
|
||||
use utils::signals::ShutdownSignals;
|
||||
|
||||
use metrics::{Encoder, TextEncoder};
|
||||
use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
|
||||
use storage_broker::metrics::{
|
||||
BROADCASTED_MESSAGES_TOTAL, BROADCAST_DROPPED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
|
||||
NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
|
||||
};
|
||||
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
|
||||
use storage_broker::proto::{
|
||||
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
|
||||
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
|
||||
};
|
||||
use storage_broker::{
|
||||
parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
|
||||
};
|
||||
@@ -73,8 +79,103 @@ struct Args {
|
||||
log_format: String,
|
||||
}
|
||||
|
||||
type PubId = u64; // id of publisher for registering in maps
|
||||
type SubId = u64; // id of subscriber for registering in maps
|
||||
/// Id of publisher for registering in maps
|
||||
type PubId = u64;
|
||||
|
||||
/// Id of subscriber for registering in maps
|
||||
type SubId = u64;
|
||||
|
||||
/// Single enum type for all messages.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
enum Message {
|
||||
SafekeeperTimelineInfo(SafekeeperTimelineInfo),
|
||||
SafekeeperDiscoveryRequest(SafekeeperDiscoveryRequest),
|
||||
SafekeeperDiscoveryResponse(SafekeeperDiscoveryResponse),
|
||||
}
|
||||
|
||||
impl Message {
|
||||
/// Convert proto message to internal message.
|
||||
pub fn from(proto_msg: TypedMessage) -> Result<Self, Status> {
|
||||
match proto_msg.r#type() {
|
||||
MessageType::SafekeeperTimelineInfo => Ok(Message::SafekeeperTimelineInfo(
|
||||
proto_msg.safekeeper_timeline_info.ok_or_else(|| {
|
||||
Status::new(Code::InvalidArgument, "missing safekeeper_timeline_info")
|
||||
})?,
|
||||
)),
|
||||
MessageType::SafekeeperDiscoveryRequest => Ok(Message::SafekeeperDiscoveryRequest(
|
||||
proto_msg.safekeeper_discovery_request.ok_or_else(|| {
|
||||
Status::new(
|
||||
Code::InvalidArgument,
|
||||
"missing safekeeper_discovery_request",
|
||||
)
|
||||
})?,
|
||||
)),
|
||||
MessageType::SafekeeperDiscoveryResponse => Ok(Message::SafekeeperDiscoveryResponse(
|
||||
proto_msg.safekeeper_discovery_response.ok_or_else(|| {
|
||||
Status::new(
|
||||
Code::InvalidArgument,
|
||||
"missing safekeeper_discovery_response",
|
||||
)
|
||||
})?,
|
||||
)),
|
||||
MessageType::Unknown => Err(Status::new(
|
||||
Code::InvalidArgument,
|
||||
format!("invalid message type: {:?}", proto_msg.r#type),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the tenant_timeline_id from the message.
|
||||
pub fn tenant_timeline_id(&self) -> Result<Option<TenantTimelineId>, Status> {
|
||||
match self {
|
||||
Message::SafekeeperTimelineInfo(msg) => Ok(msg
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.map(parse_proto_ttid)
|
||||
.transpose()?),
|
||||
Message::SafekeeperDiscoveryRequest(msg) => Ok(msg
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.map(parse_proto_ttid)
|
||||
.transpose()?),
|
||||
Message::SafekeeperDiscoveryResponse(msg) => Ok(msg
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.map(parse_proto_ttid)
|
||||
.transpose()?),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert internal message to the protobuf struct.
|
||||
pub fn as_typed_message(&self) -> TypedMessage {
|
||||
let mut res = TypedMessage {
|
||||
r#type: self.message_type() as i32,
|
||||
..Default::default()
|
||||
};
|
||||
match self {
|
||||
Message::SafekeeperTimelineInfo(msg) => {
|
||||
res.safekeeper_timeline_info = Some(msg.clone())
|
||||
}
|
||||
Message::SafekeeperDiscoveryRequest(msg) => {
|
||||
res.safekeeper_discovery_request = Some(msg.clone())
|
||||
}
|
||||
Message::SafekeeperDiscoveryResponse(msg) => {
|
||||
res.safekeeper_discovery_response = Some(msg.clone())
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
/// Get the message type.
|
||||
pub fn message_type(&self) -> MessageType {
|
||||
match self {
|
||||
Message::SafekeeperTimelineInfo(_) => MessageType::SafekeeperTimelineInfo,
|
||||
Message::SafekeeperDiscoveryRequest(_) => MessageType::SafekeeperDiscoveryRequest,
|
||||
Message::SafekeeperDiscoveryResponse(_) => MessageType::SafekeeperDiscoveryResponse,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
enum SubscriptionKey {
|
||||
@@ -83,7 +184,7 @@ enum SubscriptionKey {
|
||||
}
|
||||
|
||||
impl SubscriptionKey {
|
||||
// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
|
||||
/// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
|
||||
pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
|
||||
match key {
|
||||
ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
|
||||
@@ -92,14 +193,29 @@ impl SubscriptionKey {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse from FilterTenantTimelineId
|
||||
pub fn from_proto_filter_tenant_timeline_id(
|
||||
f: &FilterTenantTimelineId,
|
||||
) -> Result<Self, Status> {
|
||||
if !f.enabled {
|
||||
return Ok(SubscriptionKey::All);
|
||||
}
|
||||
|
||||
let ttid =
|
||||
parse_proto_ttid(f.tenant_timeline_id.as_ref().ok_or_else(|| {
|
||||
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
|
||||
})?)?;
|
||||
Ok(SubscriptionKey::Timeline(ttid))
|
||||
}
|
||||
}
|
||||
|
||||
// Channel to timeline subscribers.
|
||||
/// Channel to timeline subscribers.
|
||||
struct ChanToTimelineSub {
|
||||
chan: broadcast::Sender<SafekeeperTimelineInfo>,
|
||||
// Tracked separately to know when delete the shmem entry. receiver_count()
|
||||
// is unhandy for that as unregistering and dropping the receiver side
|
||||
// happens at different moments.
|
||||
chan: broadcast::Sender<Message>,
|
||||
/// Tracked separately to know when delete the shmem entry. receiver_count()
|
||||
/// is unhandy for that as unregistering and dropping the receiver side
|
||||
/// happens at different moments.
|
||||
num_subscribers: u64,
|
||||
}
|
||||
|
||||
@@ -110,7 +226,7 @@ struct SharedState {
|
||||
num_subs_to_timelines: i64,
|
||||
chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
|
||||
num_subs_to_all: i64,
|
||||
chan_to_all_subs: broadcast::Sender<SafekeeperTimelineInfo>,
|
||||
chan_to_all_subs: broadcast::Sender<Message>,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
@@ -146,7 +262,7 @@ impl SharedState {
|
||||
&mut self,
|
||||
sub_key: SubscriptionKey,
|
||||
timeline_chan_size: usize,
|
||||
) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) {
|
||||
) -> (SubId, broadcast::Receiver<Message>) {
|
||||
let sub_id = self.next_sub_id;
|
||||
self.next_sub_id += 1;
|
||||
let sub_rx = match sub_key {
|
||||
@@ -262,6 +378,29 @@ impl Registry {
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr
|
||||
);
|
||||
}
|
||||
|
||||
/// Send msg to relevant subscribers.
|
||||
pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
|
||||
PROCESSED_MESSAGES_TOTAL.inc();
|
||||
|
||||
// send message to subscribers for everything
|
||||
let shared_state = self.shared_state.read();
|
||||
// Err means there is no subscribers, it is fine.
|
||||
shared_state.chan_to_all_subs.send(msg.clone()).ok();
|
||||
|
||||
// send message to per timeline subscribers, if there is ttid
|
||||
let ttid = msg.tenant_timeline_id()?;
|
||||
if let Some(ttid) = ttid {
|
||||
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
|
||||
// Err can't happen here, as tx is destroyed only after removing
|
||||
// from the map the last subscriber along with tx.
|
||||
subs.chan
|
||||
.send(msg.clone())
|
||||
.expect("rx is still in the map with zero subscribers");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Private subscriber state.
|
||||
@@ -269,7 +408,7 @@ struct Subscriber {
|
||||
id: SubId,
|
||||
key: SubscriptionKey,
|
||||
// Subscriber receives messages from publishers here.
|
||||
sub_rx: broadcast::Receiver<SafekeeperTimelineInfo>,
|
||||
sub_rx: broadcast::Receiver<Message>,
|
||||
// to unregister itself from shared state in Drop
|
||||
registry: Registry,
|
||||
// for logging
|
||||
@@ -291,26 +430,9 @@ struct Publisher {
|
||||
}
|
||||
|
||||
impl Publisher {
|
||||
// Send msg to relevant subscribers.
|
||||
pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
|
||||
// send message to subscribers for everything
|
||||
let shared_state = self.registry.shared_state.read();
|
||||
// Err means there is no subscribers, it is fine.
|
||||
shared_state.chan_to_all_subs.send(msg.clone()).ok();
|
||||
|
||||
// send message to per timeline subscribers
|
||||
let ttid =
|
||||
parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
|
||||
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
|
||||
})?)?;
|
||||
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
|
||||
// Err can't happen here, as tx is destroyed only after removing
|
||||
// from the map the last subscriber along with tx.
|
||||
subs.chan
|
||||
.send(msg.clone())
|
||||
.expect("rx is still in the map with zero subscribers");
|
||||
}
|
||||
Ok(())
|
||||
/// Send msg to relevant subscribers.
|
||||
pub fn send_msg(&mut self, msg: &Message) -> Result<(), Status> {
|
||||
self.registry.send_msg(msg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,7 +461,7 @@ impl BrokerService for Broker {
|
||||
|
||||
loop {
|
||||
match stream.next().await {
|
||||
Some(Ok(msg)) => publisher.send_msg(&msg)?,
|
||||
Some(Ok(msg)) => publisher.send_msg(&Message::SafekeeperTimelineInfo(msg))?,
|
||||
Some(Err(e)) => return Err(e), // grpc error from the stream
|
||||
None => break, // closed stream
|
||||
}
|
||||
@@ -371,8 +493,15 @@ impl BrokerService for Broker {
|
||||
let mut missed_msgs: u64 = 0;
|
||||
loop {
|
||||
match subscriber.sub_rx.recv().await {
|
||||
Ok(info) => yield info,
|
||||
Ok(info) => {
|
||||
match info {
|
||||
Message::SafekeeperTimelineInfo(info) => yield info,
|
||||
_ => {},
|
||||
}
|
||||
BROADCASTED_MESSAGES_TOTAL.inc();
|
||||
},
|
||||
Err(RecvError::Lagged(skipped_msg)) => {
|
||||
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
|
||||
missed_msgs += skipped_msg;
|
||||
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
|
||||
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
|
||||
@@ -392,6 +521,78 @@ impl BrokerService for Broker {
|
||||
Box::pin(output) as Self::SubscribeSafekeeperInfoStream
|
||||
))
|
||||
}
|
||||
|
||||
type SubscribeByFilterStream =
|
||||
Pin<Box<dyn Stream<Item = Result<TypedMessage, Status>> + Send + 'static>>;
|
||||
|
||||
/// Subscribe to all messages, limited by a filter.
|
||||
async fn subscribe_by_filter(
|
||||
&self,
|
||||
request: Request<SubscribeByFilterRequest>,
|
||||
) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
|
||||
let remote_addr = request
|
||||
.remote_addr()
|
||||
.expect("TCPConnectInfo inserted by handler");
|
||||
let proto_filter = request.into_inner();
|
||||
let ttid_filter = proto_filter
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.ok_or_else(|| Status::new(Code::InvalidArgument, "missing tenant_timeline_id"))?;
|
||||
|
||||
let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
|
||||
let types_set = proto_filter
|
||||
.types
|
||||
.iter()
|
||||
.map(|t| t.r#type)
|
||||
.collect::<std::collections::HashSet<_>>();
|
||||
|
||||
let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
|
||||
|
||||
// transform rx into stream with item = Result, as method result demands
|
||||
let output = async_stream::try_stream! {
|
||||
let mut warn_interval = time::interval(Duration::from_millis(1000));
|
||||
let mut missed_msgs: u64 = 0;
|
||||
loop {
|
||||
match subscriber.sub_rx.recv().await {
|
||||
Ok(msg) => {
|
||||
let msg_type = msg.message_type() as i32;
|
||||
if types_set.contains(&msg_type) {
|
||||
yield msg.as_typed_message();
|
||||
BROADCASTED_MESSAGES_TOTAL.inc();
|
||||
}
|
||||
},
|
||||
Err(RecvError::Lagged(skipped_msg)) => {
|
||||
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
|
||||
missed_msgs += skipped_msg;
|
||||
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
|
||||
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
|
||||
missed_msgs = 0;
|
||||
}
|
||||
}
|
||||
Err(RecvError::Closed) => {
|
||||
// can't happen, we never drop the channel while there is a subscriber
|
||||
Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Response::new(
|
||||
Box::pin(output) as Self::SubscribeByFilterStream
|
||||
))
|
||||
}
|
||||
|
||||
/// Publish one message.
|
||||
async fn publish_one(
|
||||
&self,
|
||||
request: Request<TypedMessage>,
|
||||
) -> std::result::Result<Response<()>, Status> {
|
||||
let msg = Message::from(request.into_inner())?;
|
||||
PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
|
||||
self.registry.send_msg(&msg)?;
|
||||
Ok(Response::new(()))
|
||||
}
|
||||
}
|
||||
|
||||
// We serve only metrics and healthcheck through http1.
|
||||
@@ -515,8 +716,8 @@ mod tests {
|
||||
use tokio::sync::broadcast::error::TryRecvError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
fn msg(timeline_id: Vec<u8>) -> SafekeeperTimelineInfo {
|
||||
SafekeeperTimelineInfo {
|
||||
fn msg(timeline_id: Vec<u8>) -> Message {
|
||||
Message::SafekeeperTimelineInfo(SafekeeperTimelineInfo {
|
||||
safekeeper_id: 1,
|
||||
tenant_timeline_id: Some(ProtoTenantTimelineId {
|
||||
tenant_id: vec![0x00; 16],
|
||||
@@ -533,7 +734,7 @@ mod tests {
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn tli_from_u64(i: u64) -> Vec<u8> {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Broker metrics.
|
||||
|
||||
use metrics::{register_int_gauge, IntGauge};
|
||||
use metrics::{register_int_counter, register_int_gauge, IntCounter, IntGauge};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
pub static NUM_PUBS: Lazy<IntGauge> = Lazy::new(|| {
|
||||
@@ -23,3 +23,35 @@ pub static NUM_SUBS_ALL: Lazy<IntGauge> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static PROCESSED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_processed_messages_total",
|
||||
"Number of messages received by storage broker, before routing and broadcasting"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static BROADCASTED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_broadcasted_messages_total",
|
||||
"Number of messages broadcasted (sent over network) to subscribers"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static BROADCAST_DROPPED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_broadcast_dropped_messages_total",
|
||||
"Number of messages dropped due to channel capacity overflow"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static PUBLISHED_ONEOFF_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_published_oneoff_messages_total",
|
||||
"Number of one-off messages sent via PublishOne method"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
@@ -17,6 +17,27 @@ class LabelledQuery:
|
||||
query: str
|
||||
|
||||
|
||||
# This must run before all tests in this module
|
||||
# create extension pg_stat_statements if it does not exist
|
||||
# and TEST_OLAP_COLLECT_PG_STAT_STATEMENTS is set to true (default false)
|
||||
# Theoretically this could be in a module or session scope fixture,
|
||||
# however the code depends on other fixtures that have function scope
|
||||
@pytest.mark.skipif(
|
||||
os.getenv("TEST_OLAP_COLLECT_PG_STAT_STATEMENTS", "false").lower() == "false",
|
||||
reason="Skipping - Creating extension pg_stat_statements",
|
||||
)
|
||||
@pytest.mark.remote_cluster
|
||||
def test_clickbench_create_pg_stat_statements(remote_compare: RemoteCompare):
|
||||
log.info("Creating extension pg_stat_statements")
|
||||
query = LabelledQuery(
|
||||
"Q_CREATE_EXTENSION", r"CREATE EXTENSION IF NOT EXISTS pg_stat_statements;"
|
||||
)
|
||||
run_psql(remote_compare, query, times=1, explain=False)
|
||||
log.info("Reset pg_stat_statements")
|
||||
query = LabelledQuery("Q_RESET", r"SELECT pg_stat_statements_reset();")
|
||||
run_psql(remote_compare, query, times=1, explain=False)
|
||||
|
||||
|
||||
# A list of queries to run.
|
||||
# Please do not alter the label for the query, as it is used to identify it.
|
||||
# Labels for ClickBench queries match the labels in ClickBench reports
|
||||
@@ -78,6 +99,8 @@ QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
# fmt: on
|
||||
)
|
||||
|
||||
EXPLAIN_STRING: str = "EXPLAIN (ANALYZE, VERBOSE, BUFFERS, COSTS, SETTINGS, FORMAT JSON)"
|
||||
|
||||
|
||||
def get_scale() -> List[str]:
|
||||
# We parametrize each tpc-h and clickbench test with scale
|
||||
@@ -88,7 +111,10 @@ def get_scale() -> List[str]:
|
||||
return [scale]
|
||||
|
||||
|
||||
def run_psql(env: RemoteCompare, labelled_query: LabelledQuery, times: int) -> None:
|
||||
# run the query times times plus once with EXPLAIN VERBOSE if explain is requestd
|
||||
def run_psql(
|
||||
env: RemoteCompare, labelled_query: LabelledQuery, times: int, explain: bool = False
|
||||
) -> None:
|
||||
# prepare connstr:
|
||||
# - cut out password from connstr to pass it via env
|
||||
# - add options to connstr
|
||||
@@ -108,6 +134,13 @@ def run_psql(env: RemoteCompare, labelled_query: LabelledQuery, times: int) -> N
|
||||
log.info(f"Run {run}/{times}")
|
||||
with env.zenbenchmark.record_duration(f"{label}/{run}"):
|
||||
env.pg_bin.run_capture(["psql", connstr, "-c", query], env=environ)
|
||||
if explain:
|
||||
log.info(f"Explaining query {label}")
|
||||
run += 1
|
||||
with env.zenbenchmark.record_duration(f"{label}/EXPLAIN"):
|
||||
env.pg_bin.run_capture(
|
||||
["psql", connstr, "-c", f"{EXPLAIN_STRING} {query}"], env=environ
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("scale", get_scale())
|
||||
@@ -120,8 +153,9 @@ def test_clickbench(query: LabelledQuery, remote_compare: RemoteCompare, scale:
|
||||
Based on https://github.com/ClickHouse/ClickBench/tree/c00135ca5b6a0d86fedcdbf998fdaa8ed85c1c3b/aurora-postgresql
|
||||
The DB prepared manually in advance
|
||||
"""
|
||||
explain: bool = os.getenv("TEST_OLAP_COLLECT_EXPLAIN", "false").lower() == "true"
|
||||
|
||||
run_psql(remote_compare, query, times=3)
|
||||
run_psql(remote_compare, query, times=3, explain=explain)
|
||||
|
||||
|
||||
def tpch_queuies() -> Tuple[ParameterSet, ...]:
|
||||
@@ -195,3 +229,16 @@ def test_user_examples(remote_compare: RemoteCompare):
|
||||
""",
|
||||
)
|
||||
run_psql(remote_compare, query, times=3)
|
||||
|
||||
|
||||
# This must run after all tests in this module
|
||||
# Collect pg_stat_statements after running the tests if TEST_OLAP_COLLECT_PG_STAT_STATEMENTS is set to true (default false)
|
||||
@pytest.mark.skipif(
|
||||
os.getenv("TEST_OLAP_COLLECT_PG_STAT_STATEMENTS", "false").lower() == "false",
|
||||
reason="Skipping - Collecting pg_stat_statements",
|
||||
)
|
||||
@pytest.mark.remote_cluster
|
||||
def test_clickbench_collect_pg_stat_statements(remote_compare: RemoteCompare):
|
||||
log.info("Collecting pg_stat_statements")
|
||||
query = LabelledQuery("Q_COLLECT_PG_STAT_STATEMENTS", r"SELECT * from pg_stat_statements;")
|
||||
run_psql(remote_compare, query, times=1, explain=False)
|
||||
|
||||
@@ -56,7 +56,7 @@ regex = { version = "1" }
|
||||
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
|
||||
regex-syntax = { version = "0.8" }
|
||||
reqwest = { version = "0.11", default-features = false, features = ["blocking", "default-tls", "json", "multipart", "rustls-tls", "stream"] }
|
||||
ring = { version = "0.16", features = ["std"] }
|
||||
ring = { version = "0.16" }
|
||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
||||
scopeguard = { version = "1" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
@@ -75,8 +75,8 @@ tracing-core = { version = "0.1" }
|
||||
tungstenite = { version = "0.20" }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
uuid = { version = "1", features = ["serde", "v4"] }
|
||||
zstd = { version = "0.12" }
|
||||
zstd-safe = { version = "6", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
zstd = { version = "0.13" }
|
||||
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }
|
||||
|
||||
[build-dependencies]
|
||||
|
||||
Reference in New Issue
Block a user