mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
pageserver: remove trace_read_requests (#8338)
`trace_read_requests` is a per `Tenant`-object option. But the `handle_pagerequests` loop doesn't know which `Tenant` object (i.e., which shard) the request is for. The remaining use of the `Tenant` object is to check `tenant.cancel`. That check is incorrect [if the pageserver hosts multiple shards](https://github.com/neondatabase/neon/issues/7427#issuecomment-2220577518). I'll fix that in a future PR where I completely eliminate the holding of `Tenant/Timeline` objects across requests. See [my code RFC](https://github.com/neondatabase/neon/pull/8286) for the high level idea. Note that we can always bring the tracing functionality if we need it. But since it's actually about logging the `page_service` wire bytes, it should be a `page_service`-level config option, not per-Tenant. And for enabling tracing on a single connection, we can implement a `set pageserver_trace_connection;` option.
This commit is contained in:
committed by
GitHub
parent
c11b9cb43d
commit
e26ef640c1
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -6510,17 +6510,6 @@ version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
|
||||
|
||||
[[package]]
|
||||
name = "trace"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"pageserver_api",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.37"
|
||||
|
||||
@@ -15,7 +15,6 @@ members = [
|
||||
"storage_controller",
|
||||
"storage_scrubber",
|
||||
"workspace_hack",
|
||||
"trace",
|
||||
"libs/compute_api",
|
||||
"libs/pageserver_api",
|
||||
"libs/postgres_ffi",
|
||||
|
||||
@@ -349,11 +349,6 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
trace_read_requests: settings
|
||||
.remove("trace_read_requests")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
eviction_policy: settings
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
@@ -454,11 +449,6 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
trace_read_requests: settings
|
||||
.remove("trace_read_requests")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
eviction_policy: settings
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
|
||||
@@ -294,7 +294,6 @@ pub struct TenantConfig {
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub trace_read_requests: Option<bool>,
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
|
||||
@@ -302,17 +302,6 @@ pub struct TenantId(Id);
|
||||
|
||||
id_newtype!(TenantId);
|
||||
|
||||
/// Neon Connection Id identifies long-lived connections (for example a pagestream
|
||||
/// connection with the page_service). Is used for better logging and tracing
|
||||
///
|
||||
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
|
||||
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
|
||||
/// See [`Id`] for alternative ways to serialize it.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
|
||||
pub struct ConnectionId(Id);
|
||||
|
||||
id_newtype!(ConnectionId);
|
||||
|
||||
// A pair uniquely identifying Neon instance.
|
||||
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct TenantTimelineId {
|
||||
|
||||
@@ -12,7 +12,6 @@ use serde::de::IntoDeserializer;
|
||||
use std::env;
|
||||
use storage_broker::Uri;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::ConnectionId;
|
||||
use utils::logging::SecretString;
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
@@ -870,22 +869,6 @@ impl PageServerConf {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn traces_path(&self) -> Utf8PathBuf {
|
||||
self.workdir.join("traces")
|
||||
}
|
||||
|
||||
pub fn trace_path(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
connection_id: &ConnectionId,
|
||||
) -> Utf8PathBuf {
|
||||
self.traces_path()
|
||||
.join(tenant_shard_id.to_string())
|
||||
.join(timeline_id.to_string())
|
||||
.join(connection_id.to_string())
|
||||
}
|
||||
|
||||
/// Turns storage remote path of a file into its local path.
|
||||
pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
|
||||
remote_path.with_base(&self.workdir)
|
||||
@@ -1560,34 +1543,6 @@ broker_endpoint = '{broker_endpoint}'
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tenant_config() -> anyhow::Result<()> {
|
||||
let tempdir = tempdir()?;
|
||||
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
|
||||
|
||||
let broker_endpoint = "http://127.0.0.1:7777";
|
||||
let trace_read_requests = true;
|
||||
|
||||
let config_string = format!(
|
||||
r#"{ALL_BASE_VALUES_TOML}
|
||||
pg_distrib_dir='{pg_distrib_dir}'
|
||||
broker_endpoint = '{broker_endpoint}'
|
||||
|
||||
[tenant_config]
|
||||
trace_read_requests = {trace_read_requests}"#,
|
||||
);
|
||||
|
||||
let toml = config_string.parse()?;
|
||||
|
||||
let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
|
||||
assert_eq!(
|
||||
conf.default_tenant_conf.trace_read_requests, trace_read_requests,
|
||||
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_incorrect_tenant_config() -> anyhow::Result<()> {
|
||||
let config_string = r#"
|
||||
|
||||
@@ -873,8 +873,6 @@ components:
|
||||
type: string
|
||||
max_lsn_wal_lag:
|
||||
type: integer
|
||||
trace_read_requests:
|
||||
type: boolean
|
||||
heatmap_period:
|
||||
type: string
|
||||
TenantConfigResponse:
|
||||
|
||||
@@ -23,7 +23,6 @@ pub mod span;
|
||||
pub(crate) mod statvfs;
|
||||
pub mod task_mgr;
|
||||
pub mod tenant;
|
||||
pub mod trace;
|
||||
pub mod utilization;
|
||||
pub mod virtual_file;
|
||||
pub mod walingest;
|
||||
|
||||
@@ -36,7 +36,6 @@ use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
use utils::sync::gate::GateGuard;
|
||||
use utils::{
|
||||
auth::{Claims, Scope, SwappableJwtAuth},
|
||||
@@ -66,7 +65,6 @@ use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::Tenant;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::trace::Tracer;
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
@@ -430,18 +428,6 @@ impl PageServerHandler {
|
||||
.get_active_tenant_with_timeout(tenant_id, ShardSelector::First, ACTIVE_TENANT_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
// Make request tracer if needed
|
||||
let mut tracer = if tenant.get_trace_read_requests() {
|
||||
let connection_id = ConnectionId::generate();
|
||||
let path =
|
||||
tenant
|
||||
.conf
|
||||
.trace_path(&tenant.tenant_shard_id(), &timeline_id, &connection_id);
|
||||
Some(Tracer::new(path))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
self.flush_cancellable(pgb, &tenant.cancel).await?;
|
||||
@@ -473,11 +459,6 @@ impl PageServerHandler {
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
fail::fail_point!("ps::handle-pagerequest-message");
|
||||
|
||||
// Trace request if needed
|
||||
if let Some(t) = tracer.as_mut() {
|
||||
t.trace(©_data_bytes)
|
||||
}
|
||||
|
||||
let neon_fe_msg =
|
||||
PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
|
||||
|
||||
|
||||
@@ -2341,13 +2341,6 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
|
||||
}
|
||||
|
||||
pub fn get_trace_read_requests(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.trace_read_requests
|
||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||
}
|
||||
|
||||
pub fn get_min_resident_size_override(&self) -> Option<u64> {
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
@@ -3718,7 +3711,6 @@ pub(crate) mod harness {
|
||||
walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
|
||||
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
|
||||
trace_read_requests: Some(tenant_conf.trace_read_requests),
|
||||
eviction_policy: Some(tenant_conf.eviction_policy),
|
||||
min_resident_size_override: tenant_conf.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: Some(
|
||||
|
||||
@@ -335,7 +335,6 @@ pub struct TenantConf {
|
||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub trace_read_requests: bool,
|
||||
pub eviction_policy: EvictionPolicy,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
// See the corresponding metric's help string.
|
||||
@@ -436,10 +435,6 @@ pub struct TenantConfOpt {
|
||||
#[serde(default)]
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub trace_read_requests: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
@@ -519,9 +514,6 @@ impl TenantConfOpt {
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(global_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
||||
trace_read_requests: self
|
||||
.trace_read_requests
|
||||
.unwrap_or(global_conf.trace_read_requests),
|
||||
eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
|
||||
min_resident_size_override: self
|
||||
.min_resident_size_override
|
||||
@@ -581,7 +573,6 @@ impl Default for TenantConf {
|
||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
trace_read_requests: false,
|
||||
eviction_policy: EvictionPolicy::NoEviction,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
@@ -659,7 +650,6 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
walreceiver_connect_timeout: value.walreceiver_connect_timeout.map(humantime),
|
||||
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
|
||||
max_lsn_wal_lag: value.max_lsn_wal_lag,
|
||||
trace_read_requests: value.trace_read_requests,
|
||||
eviction_policy: value.eviction_policy,
|
||||
min_resident_size_override: value.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: value
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use std::{
|
||||
fs::{create_dir_all, File},
|
||||
io::{BufWriter, Write},
|
||||
};
|
||||
|
||||
pub struct Tracer {
|
||||
writer: BufWriter<File>,
|
||||
}
|
||||
|
||||
impl Drop for Tracer {
|
||||
fn drop(&mut self) {
|
||||
self.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl Tracer {
|
||||
pub fn new(path: Utf8PathBuf) -> Self {
|
||||
let parent = path.parent().expect("failed to parse parent path");
|
||||
create_dir_all(parent).expect("failed to create trace dir");
|
||||
|
||||
let file = File::create(path).expect("failed to create trace file");
|
||||
Tracer {
|
||||
writer: BufWriter::new(file),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn trace(&mut self, msg: &Bytes) {
|
||||
self.writer.write_all(msg).expect("failed to write trace");
|
||||
}
|
||||
|
||||
pub fn flush(&mut self) {
|
||||
self.writer.flush().expect("failed to flush trace file");
|
||||
}
|
||||
}
|
||||
@@ -109,8 +109,6 @@ class NeonCompare(PgCompare):
|
||||
|
||||
# Create tenant
|
||||
tenant_conf: Dict[str, str] = {}
|
||||
if False: # TODO add pytest setting for this
|
||||
tenant_conf["trace_read_requests"] = "true"
|
||||
self.tenant, _ = self.env.neon_cli.create_tenant(conf=tenant_conf)
|
||||
|
||||
# Create timeline
|
||||
|
||||
@@ -168,7 +168,6 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"refill_amount": 1000,
|
||||
"max": 1000,
|
||||
},
|
||||
"trace_read_requests": True,
|
||||
"walreceiver_connect_timeout": "13m",
|
||||
"image_layer_creation_check_threshold": 1,
|
||||
"switch_aux_file_policy": "cross-validation",
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
# This test demonstrates how to collect a read trace. It's useful until
|
||||
# it gets replaced by a test that actually does stuff with the trace.
|
||||
#
|
||||
# Additionally, tests that pageserver is able to create tenants with custom configs.
|
||||
def test_read_request_tracing(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"trace_read_requests": "true",
|
||||
}
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("create table t (i integer);")
|
||||
cur.execute(f"insert into t values (generate_series(1,{10000}));")
|
||||
cur.execute("select count(*) from t;")
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
# wait until pageserver receives that data
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
# Stop postgres so we drop the connection and flush the traces
|
||||
endpoint.stop()
|
||||
|
||||
trace_path = env.pageserver.workdir / "traces" / str(tenant_id) / str(timeline_id)
|
||||
assert trace_path.exists()
|
||||
@@ -1,13 +0,0 @@
|
||||
[package]
|
||||
name = "trace"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
clap.workspace = true
|
||||
anyhow.workspace = true
|
||||
|
||||
pageserver_api.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
@@ -1,167 +0,0 @@
|
||||
//! A tool for working with read traces generated by the pageserver.
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::{
|
||||
fs::{read_dir, File},
|
||||
io::BufReader,
|
||||
};
|
||||
|
||||
use pageserver_api::models::{
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamProtocolVersion,
|
||||
};
|
||||
use utils::id::{ConnectionId, TenantId, TimelineId};
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
|
||||
/// Utils for working with pageserver read traces. For generating
|
||||
/// traces, see the `trace_read_requests` tenant config option.
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// Path of trace directory
|
||||
#[arg(short, long)]
|
||||
path: PathBuf,
|
||||
|
||||
#[command(subcommand)]
|
||||
command: Command,
|
||||
}
|
||||
|
||||
/// What to do with the read trace
|
||||
#[derive(Subcommand, Debug)]
|
||||
enum Command {
|
||||
/// List traces in the directory
|
||||
List,
|
||||
|
||||
/// Print the traces in text format
|
||||
Dump,
|
||||
|
||||
/// Print stats and anomalies about the traces
|
||||
Analyze,
|
||||
}
|
||||
|
||||
// HACK This function will change and improve as we see what kind of analysis is useful.
|
||||
// Currently it collects the difference in blkno of consecutive GetPage requests,
|
||||
// and counts the frequency of each value. This information is useful in order to:
|
||||
// - see how sequential a workload is by seeing how often the delta is 1
|
||||
// - detect any prefetching anomalies by looking for negative deltas during seqscan
|
||||
fn analyze_trace<R: std::io::Read>(mut reader: R) {
|
||||
let mut total = 0; // Total requests traced
|
||||
let mut cross_rel = 0; // Requests that ask for different rel than previous request
|
||||
let mut deltas = HashMap::<i32, u32>::new(); // Consecutive blkno differences
|
||||
let mut prev: Option<PagestreamGetPageRequest> = None;
|
||||
|
||||
// Compute stats
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader, PagestreamProtocolVersion::V2) {
|
||||
match msg {
|
||||
PagestreamFeMessage::Exists(_) => {}
|
||||
PagestreamFeMessage::Nblocks(_) => {}
|
||||
PagestreamFeMessage::GetSlruSegment(_) => {}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
total += 1;
|
||||
|
||||
if let Some(prev) = prev {
|
||||
if prev.rel == req.rel {
|
||||
let delta = (req.blkno as i32) - (prev.blkno as i32);
|
||||
deltas.entry(delta).and_modify(|c| *c += 1).or_insert(1);
|
||||
} else {
|
||||
cross_rel += 1;
|
||||
}
|
||||
}
|
||||
prev = Some(req);
|
||||
}
|
||||
PagestreamFeMessage::DbSize(_) => {}
|
||||
};
|
||||
}
|
||||
|
||||
// Print stats.
|
||||
let mut other = deltas.len();
|
||||
deltas.retain(|_, count| *count > 300);
|
||||
other -= deltas.len();
|
||||
dbg!(total);
|
||||
dbg!(cross_rel);
|
||||
dbg!(other);
|
||||
dbg!(deltas);
|
||||
}
|
||||
|
||||
fn dump_trace<R: std::io::Read>(mut reader: R) {
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader, PagestreamProtocolVersion::V2) {
|
||||
println!("{msg:?}");
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TraceFile {
|
||||
#[allow(dead_code)]
|
||||
pub tenant_id: TenantId,
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub timeline_id: TimelineId,
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub connection_id: ConnectionId,
|
||||
|
||||
pub path: PathBuf,
|
||||
}
|
||||
|
||||
fn get_trace_files(traces_dir: &PathBuf) -> anyhow::Result<Vec<TraceFile>> {
|
||||
let mut trace_files = Vec::<TraceFile>::new();
|
||||
|
||||
// Trace files are organized as {tenant_id}/{timeline_id}/{connection_id}
|
||||
for tenant_dir in read_dir(traces_dir)? {
|
||||
let entry = tenant_dir?;
|
||||
let path = entry.path();
|
||||
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
for timeline_dir in read_dir(path)? {
|
||||
let entry = timeline_dir?;
|
||||
let path = entry.path();
|
||||
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
for trace_dir in read_dir(path)? {
|
||||
let entry = trace_dir?;
|
||||
let path = entry.path();
|
||||
let connection_id =
|
||||
ConnectionId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
trace_files.push(TraceFile {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
connection_id,
|
||||
path,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(trace_files)
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
match args.command {
|
||||
Command::List => {
|
||||
for trace_file in get_trace_files(&args.path)? {
|
||||
println!("{trace_file:?}");
|
||||
}
|
||||
}
|
||||
Command::Dump => {
|
||||
for trace_file in get_trace_files(&args.path)? {
|
||||
let file = File::open(trace_file.path.clone())?;
|
||||
let reader = BufReader::new(file);
|
||||
dump_trace(reader);
|
||||
}
|
||||
}
|
||||
Command::Analyze => {
|
||||
for trace_file in get_trace_files(&args.path)? {
|
||||
println!("analyzing {trace_file:?}");
|
||||
let file = File::open(trace_file.path.clone())?;
|
||||
let reader = BufReader::new(file);
|
||||
analyze_trace(reader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user