diff --git a/pageserver/src/bin/replay.rs b/pageserver/src/bin/replay.rs index 9e44362d7a..7ccbe0911b 100644 --- a/pageserver/src/bin/replay.rs +++ b/pageserver/src/bin/replay.rs @@ -5,10 +5,13 @@ use std::{ path::PathBuf, str::FromStr, }; -use tokio::net::TcpStream; +use tokio::{io::AsyncWriteExt, net::TcpStream}; use clap::{App, Arg}; -use utils::zid::{ZTenantId, ZTimelineId}; +use utils::{ + pq_proto::FeMessage, + zid::{ZTenantId, ZTimelineId}, +}; // TODO put this in library, dedup with stuff in control_plane /// Client for the pageserver's pagestream API @@ -16,18 +19,18 @@ struct PagestreamApi { stream: TcpStream, } -/// Good enough implementation for these tests impl PagestreamApi { async fn connect( connstr: &str, tenant: &ZTenantId, timeline: &ZTimelineId, ) -> anyhow::Result { - let mut stream = TcpStream::connect("localhost:15000").await?; - - // Connect to pageserver - println!("connecting to: {}", connstr); + // Parse connstr let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr"); + let tcp_addr = format!("localhost:{}", config.get_ports()[0]); + + // Connect + let mut stream = TcpStream::connect(tcp_addr).await?; let (client, conn) = config .connect_raw(&mut stream, tokio_postgres::NoTls) .await?; @@ -39,13 +42,15 @@ impl PagestreamApi { _ = client.query(init_query.as_str(), &[]) => (), }; - println!("connected"); - Ok(PagestreamApi { stream }) } - async fn make_request(&mut self) -> anyhow::Result<()> { - // TODO implement + async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> { + let msg_bytes = msg.serialize(); + self.stream.write_all(&msg_bytes).await?; + + let _response = FeMessage::read_fut(&mut self.stream).await?; + Ok(()) } } @@ -56,7 +61,7 @@ async fn replay_trace( ) -> anyhow::Result<()> { while let Ok(msg) = PagestreamFeMessage::parse(reader) { println!("Parsed message {:?}", msg); - pagestream.make_request().await?; + // pagestream.make_request(msg).await?; } Ok(()) @@ -97,10 +102,14 @@ async fn main() -> anyhow::Result<()> { let path = entry.path(); let _conn_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?; + // TODO The pageserver deletes existing traces? + // LOL yes because I use tenant ID as trace id let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?; - let file = File::open(path)?; + let file = File::open(path.clone())?; let mut reader = BufReader::new(file); + // let len = file.metadata().unwrap().len(); + // println!("replaying {:?} trace {} bytes", path, len); replay_trace(&mut reader, pagestream).await?; } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index a2b4eb8c08..35c1567a59 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -499,7 +499,9 @@ impl PageServerHandler { // Make request tracer if needed let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; let mut tracer = if repo.get_trace_read_requests() { - let path = repo.conf.trace_path(&tenant_id, &timeline_id, &timeline_id); + let path = repo + .conf + .trace_path(&tenant_id, &timeline_id, &ZTimelineId::generate()); Some(Tracer::new(path)) } else { None diff --git a/test_runner/performance/test_trace_replay.py b/test_runner/performance/test_trace_replay.py index 604a008335..75213a4abf 100644 --- a/test_runner/performance/test_trace_replay.py +++ b/test_runner/performance/test_trace_replay.py @@ -1,3 +1,5 @@ +from contextlib import closing + from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin @@ -10,15 +12,30 @@ def test_trace_replay(neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin): "trace_read_requests": "true", } ) - timeline = env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant) + env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant) pg = env.postgres.create_start("test_trace_replay", "main", tenant) + pg.safe_psql("select 1;") + pg.safe_psql("select 1;") + pg.safe_psql("select 1;") pg.safe_psql("select 1;") - trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline) - assert trace_path.exists() + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + cur.execute("create table t (i integer);") + cur.execute(f"insert into t values (generate_series(1,{10000}));") + cur.execute("select count(*) from t;") + + # Stop pg so we drop the connection and flush the traces + pg.stop() + + # TODO turn off tracing now? + + # trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline) + # assert trace_path.exists() print("replaying") - ps_connstr = env.pageserver.connstr().replace("'", "\\'") + ps_connstr = env.pageserver.connstr() + # ps_connstr = "host=localhost port=15004 dbname=postgres user=neon_admin" output = replay_bin.replay_all(ps_connstr) print(output)