From 357795e0c5072e1bdae2b1e88f2a3c8375dcd546 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Fri, 4 Apr 2025 16:46:11 +0200 Subject: [PATCH] VPC flow logs IaC --- Cargo.lock | 265 +++++++--- Cargo.toml | 1 + lambda/aztraffic/Cargo.toml | 22 + lambda/aztraffic/src/main.rs | 794 ++++++++++++++++++++++++++++++ lambda/pod_info_dumper/src/lib.rs | 15 +- 5 files changed, 1035 insertions(+), 62 deletions(-) create mode 100644 lambda/aztraffic/Cargo.toml create mode 100644 lambda/aztraffic/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 404b09b91e..73ba383349 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -139,9 +139,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.94" +version = "1.0.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" +checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" dependencies = [ "backtrace", ] @@ -261,9 +261,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "1.5.10" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" +checksum = "8c39646d1a6b51240a1a23bb57ea4eebede7e16fbc237fdc876980233dcecb4f" dependencies = [ "aws-credential-types", "aws-runtime", @@ -271,8 +271,8 @@ dependencies = [ "aws-sdk-ssooidc", "aws-sdk-sts", "aws-smithy-async", - "aws-smithy-http 0.60.12", - "aws-smithy-json 0.60.7", + "aws-smithy-http 0.62.0", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -280,7 +280,7 @@ dependencies = [ "bytes", "fastrand 2.3.0", "hex", - "http 0.2.9", + "http 1.1.0", "ring", "time", "tokio", @@ -351,16 +351,86 @@ dependencies = [ ] [[package]] -name = "aws-sdk-eks" -version = "1.75.0" +name = "aws-sdk-athena" +version = "1.68.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cd6c39e07cf5e4b56ea4999653cf15b53c8c114bcd455cf8f649daad7d8eedd" +checksum = "b541fd9198b447726337d19aadd6f5d85146fcb12dbc74e44f242464be313809" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http 0.60.12", - "aws-smithy-json 0.61.3", + "aws-smithy-http 0.62.0", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.9", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ec2" +version = "1.121.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ac4438771f559ca4d6023556ad46bbc0bd8ca46a541d3d323d482e2780b28f2" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.62.0", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "fastrand 2.3.0", + "http 0.2.9", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-eks" +version = "1.82.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcf9e14d61e9495c43006f42091c444dba9e59d4d368bdc756c2cc6dd90de4fe" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.62.0", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.9", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-glue" +version = "1.88.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d58ad1fd4a80d8fbf2ffd8844301724a33cb1573b8832dd7a9a07202f72b859c" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.62.0", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -383,7 +453,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http 0.60.12", - "aws-smithy-json 0.61.3", + "aws-smithy-json", "aws-smithy-query", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -406,7 +476,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http 0.60.12", - "aws-smithy-json 0.61.3", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -418,6 +488,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-lambda" +version = "1.75.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eebfb3c1189b7a906cc3e2609fc60c4e94a951f0dff53e1694e8b66c84f6b61a" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http 0.62.0", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.9", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-s3" version = "1.65.0" @@ -431,7 +525,7 @@ dependencies = [ "aws-smithy-checksums", "aws-smithy-eventstream", "aws-smithy-http 0.60.12", - "aws-smithy-json 0.61.3", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -453,21 +547,68 @@ dependencies = [ ] [[package]] -name = "aws-sdk-sso" -version = "1.50.0" +name = "aws-sdk-scheduler" +version = "1.64.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ca43a4ef210894f93096039ef1d6fa4ad3edfabb3be92b80908b9f2e4b4eab" +checksum = "a4d7c6105a4a877fbe73703ba113bb2625a69f7b76593bc8f12cff739f8c4fac" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http 0.60.12", - "aws-smithy-json 0.61.3", + "aws-smithy-http 0.62.0", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", "bytes", + "fastrand 2.3.0", + "http 0.2.9", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sfn" +version = "1.68.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f312fc11c3792f92334ecd3c839f9be8228d50291efa894ea2c00ee957b5639" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.62.0", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.9", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.64.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02d4bdb0e5f80f0689e61c77ab678b2b9304af329616af38aef5b6b967b8e736" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.62.0", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", "http 0.2.9", "once_cell", "regex-lite", @@ -476,20 +617,21 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.51.0" +version = "1.65.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abaf490c2e48eed0bb8e2da2fb08405647bd7f253996e0f93b981958ea0f73b0" +checksum = "acbbb3ce8da257aedbccdcb1aadafbbb6a5fe9adf445db0e1ea897bdc7e22d08" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http 0.60.12", - "aws-smithy-json 0.61.3", + "aws-smithy-http 0.62.0", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", "bytes", + "fastrand 2.3.0", "http 0.2.9", "once_cell", "regex-lite", @@ -506,7 +648,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http 0.62.0", - "aws-smithy-json 0.61.3", + "aws-smithy-json", "aws-smithy-query", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -619,6 +761,7 @@ version = "0.62.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5949124d11e538ca21142d1fba61ab0a2a2c1bc3ed323cdb3e4b878bfb83166" dependencies = [ + "aws-smithy-eventstream", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -662,15 +805,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "aws-smithy-json" -version = "0.60.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" -dependencies = [ - "aws-smithy-types", -] - [[package]] name = "aws-smithy-json" version = "0.61.3" @@ -883,6 +1017,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "aztraffic" +version = "0.0.0" +dependencies = [ + "anyhow", + "aws-config", + "aws-sdk-athena", + "aws-sdk-ec2", + "aws-sdk-eks", + "aws-sdk-glue", + "aws-sdk-lambda", + "aws-sdk-scheduler", + "aws-sdk-sfn", + "aws-sdk-sts", + "clap", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "azure_core" version = "0.21.0" @@ -1320,9 +1474,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.22" +version = "4.5.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69371e34337c4c984bbe322360c2547210bf632eb2814bbe78a6e87a2935bd2b" +checksum = "d8aa86934b44c19c50f87cc2790e19f54f7a67aedb64101c2e1a2e5ecfb73944" dependencies = [ "clap_builder", "clap_derive", @@ -1330,9 +1484,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.22" +version = "4.5.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e24c1b4099818523236a8ca881d2b45db98dadfb4625cf6608c12069fcbbde1" +checksum = "2414dbb2dd0695280da6ea9261e327479e9d37b0630f6b53ba2a11c60c679fd9" dependencies = [ "anstream", "anstyle", @@ -1342,9 +1496,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.18" +version = "4.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" +checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7" dependencies = [ "heck", "proc-macro2", @@ -1354,9 +1508,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" [[package]] name = "clashmap" @@ -5158,7 +5312,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.6" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b" dependencies = [ "base64 0.22.1", "byteorder", @@ -5192,7 +5346,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.6" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b" dependencies = [ "bytes", "chrono", @@ -6758,9 +6912,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.215" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" dependencies = [ "serde_derive", ] @@ -6787,9 +6941,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.215" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", @@ -6798,10 +6952,11 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.125" +version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" dependencies = [ + "indexmap 2.8.0", "itoa", "memchr", "ryu", @@ -7668,9 +7823,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.43.0" +version = "1.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" dependencies = [ "backtrace", "bytes", @@ -7724,7 +7879,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.10" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b" dependencies = [ "async-trait", "byteorder", @@ -8333,9 +8488,9 @@ dependencies = [ [[package]] name = "urlencoding" -version = "2.1.2" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8db7427f936968176eaa7cdf81b7f98b980b18495ec28f1b5791ac3bfe3eea9" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" [[package]] name = "utf-8" diff --git a/Cargo.toml b/Cargo.toml index 4c28cb3ea4..0c8a6821ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "libs/proxy/postgres-protocol2", "libs/proxy/postgres-types2", "libs/proxy/tokio-postgres2", + "lambda/aztraffic", "lambda/pod_info_dumper", ] diff --git a/lambda/aztraffic/Cargo.toml b/lambda/aztraffic/Cargo.toml new file mode 100644 index 0000000000..97619a5678 --- /dev/null +++ b/lambda/aztraffic/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "aztraffic" +version = "0.0.0" +edition.workspace = true +license.workspace = true +publish = false + +[dependencies] +anyhow = "1.0.97" +aws-config = "1.6.1" +aws-sdk-athena = "1.68.0" +aws-sdk-ec2 = "1.121.0" +aws-sdk-eks = "1.82.0" +aws-sdk-glue = "1.88.0" +aws-sdk-lambda = "1.75.0" +aws-sdk-scheduler = "1.64.0" +aws-sdk-sfn = "1.68.0" +aws-sdk-sts = "1.65.0" +clap = { version = "4.5.35", features = ["derive", "env"] } +tokio = { version = "1.44.1", features = ["full"] } +serde = "1.0.219" +serde_json = { version = "1.0.140", features = ["preserve_order"] } diff --git a/lambda/aztraffic/src/main.rs b/lambda/aztraffic/src/main.rs new file mode 100644 index 0000000000..9eb66529ba --- /dev/null +++ b/lambda/aztraffic/src/main.rs @@ -0,0 +1,794 @@ +use std::fs; + +use aws_config::default_provider::credentials::DefaultCredentialsChain; +use aws_sdk_ec2::types::{ + DestinationFileFormat, DestinationOptionsRequest, FlowLogsResourceType, LogDestinationType, + TrafficType, +}; +use aws_sdk_glue::primitives::Blob; +use aws_sdk_glue::types::{Column, DatabaseInput, SerDeInfo, StorageDescriptor, TableInput}; +use aws_sdk_lambda::types::{Environment, FunctionCode, Runtime}; +use aws_sdk_scheduler::types::{ + DeadLetterConfig, FlexibleTimeWindow, FlexibleTimeWindowMode, RetryPolicy, Target, +}; +use aws_sdk_sfn::types::{CloudWatchLogsLogGroup, LogDestination, LogLevel, LoggingConfiguration}; +use clap::Parser; +use serde_json::json; + +#[derive(Parser, Clone, Debug)] +struct Args { + #[arg(long, value_name = "id")] + account_id: String, + #[arg(long, value_name = "region")] + region: String, + #[arg(long, value_name = "cluster")] + cluster: String, + #[arg(long, value_name = "id")] + vpc_id: Vec, + + #[arg(long, value_name = "arn")] + log_group_arn: String, + #[arg(long, value_name = "name")] + pod_info_s3_bucket_name: String, + #[arg( + long, + value_name = "path", + default_value = "CrossAZTraffic/pod_info_dumper/pod_info.csv" + )] + pod_info_s3_bucket_key: String, + #[arg(long, value_name = "uri")] + pod_info_s3_bucket_uri: String, + #[arg(long, value_name = "uri")] + vpc_flow_logs_s3_bucket_uri: String, + #[arg(long, value_name = "uri")] + results_s3_bucket_uri: String, + + #[arg( + long, + value_name = "name", + default_value = "./target/lambda/pod_info_dumper/bootstrap.zip" + )] + lambda_zipfile_path: String, + #[arg( + long, + value_name = "name", + default_value = "CrossAZTraffic-podinfo-function" + )] + lambda_function_name: String, + #[arg(long, value_name = "arn")] + lambda_role_arn: String, + + #[arg(long, value_name = "name")] + glue_database_name: String, + #[arg( + long, + value_name = "name", + default_value = "CrossAZTraffic-podinfo-table" + )] + glue_pod_info_table_name: String, + #[arg( + long, + value_name = "name", + default_value = "CrossAZTraffic-vpcflowlogs-table" + )] + glue_vpc_flow_logs_table_name: String, + #[arg( + long, + value_name = "name", + default_value = "CrossAZTraffic-results-table" + )] + glue_results_table_name: String, + + #[arg( + long, + value_name = "name", + default_value = "CrossAZTraffic-trigger-schedule" + )] + schedule_name: String, + #[arg(long, value_name = "minutes", default_value_t = 60)] + schedule_interval_minutes: usize, + #[arg(long, value_name = "arn")] + schedule_target_state_machine_arn: String, + #[arg(long, value_name = "arn")] + schedule_target_role_arn: String, + #[arg(long, value_name = "arn")] + schedule_dead_letter_queue_arn: Option, + + #[arg( + long, + value_name = "name", + default_value = "CrossAZTraffic-combine-query" + )] + athena_query_name: String, + + #[arg(long, value_name = "uri")] + vpcflowlogs_destination_s3_bucket_uri: String, + + #[arg( + long, + value_name = "name", + default_value = "CrossAZTraffic-statemachine" + )] + statemachine_name: String, + #[arg(long, value_name = "arn")] + statemachine_role_arn: String, + + #[arg(long, value_name = "uri")] + athena_results_s3_bucket_uri: String, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + eprintln!("{args:#?}"); + + // TODO: athena results bucket + lifecycle config + // TODO: iam role split + // TODO: iam policy + // TODO: clusterrole + binding + // TODO: eks mapping + // TODO: log group + // TODO: dlq + + let sdk_config = create_sdk_config(&args).await?; + + LambdaFunction { + local_zipfile_path: args.lambda_zipfile_path, + function_name: args.lambda_function_name.clone(), + role_arn: args.lambda_role_arn, + account_id: args.account_id, + region: args.region, + cluster: args.cluster, + s3_bucket_name: args.pod_info_s3_bucket_name, + s3_bucket_key: args.pod_info_s3_bucket_key, + } + .create(&sdk_config) + .await?; + + GlueDatabase { + database_name: args.glue_database_name.clone(), + pod_info_table_name: args.glue_pod_info_table_name.clone(), + pod_info_s3_bucket_uri: args.pod_info_s3_bucket_uri, + vpc_flow_logs_table_name: args.glue_vpc_flow_logs_table_name.clone(), + vpc_flow_logs_s3_bucket_uri: args.vpc_flow_logs_s3_bucket_uri, + results_table_name: args.glue_results_table_name.clone(), + results_s3_bucket_uri: args.results_s3_bucket_uri, + } + .create(&sdk_config) + .await?; + + let named_query_id = AthenaQuery { + query_name: args.athena_query_name, + glue_database: args.glue_database_name.clone(), + invocation_frequency: args.schedule_interval_minutes, + athena_results_table_name: args.glue_results_table_name, + vpc_flow_logs_table_name: args.glue_vpc_flow_logs_table_name, + pod_info_table_name: args.glue_pod_info_table_name, + } + .create(&sdk_config) + .await?; + + StateMachine { + name: args.statemachine_name, + role_arn: args.statemachine_role_arn, + named_query_id, + glue_database: args.glue_database_name, + lambda_function_name: args.lambda_function_name, + athena_results_s3_bucket_uri: args.athena_results_s3_bucket_uri, + log_group_arn: args.log_group_arn, + } + .create(&sdk_config) + .await?; + + Schedule { + name: args.schedule_name, + interval_minutes: args.schedule_interval_minutes, + dead_letter_queue_arn: args.schedule_dead_letter_queue_arn, + target_role_arn: args.schedule_target_role_arn, + target_state_machine_arn: args.schedule_target_state_machine_arn, + } + .create(&sdk_config) + .await?; + + let flow_log_ids = VpcFlowLogs { + vpc_ids: args.vpc_id, + destination_s3_bucket_uri: args.vpcflowlogs_destination_s3_bucket_uri, + } + .create(&sdk_config) + .await?; + + println!("VPC flow log IDs: {:?}", flow_log_ids.as_slice()); + + Ok(()) +} + +async fn create_sdk_config(args: &Args) -> anyhow::Result { + let region = aws_config::Region::new(args.region.to_owned()); + let credentials_provider = DefaultCredentialsChain::builder() + .region(region.clone()) + .build() + .await; + Ok(aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(region) + .credentials_provider(credentials_provider) + .load() + .await) +} + +struct LambdaFunction { + local_zipfile_path: String, + function_name: String, + role_arn: String, + account_id: String, + region: String, + cluster: String, + s3_bucket_name: String, + s3_bucket_key: String, +} + +impl LambdaFunction { + async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> { + let code = fs::read(&self.local_zipfile_path)?; + + let client = aws_sdk_lambda::Client::new(sdk_config); + client + .delete_function() + .function_name(&self.function_name) + .send() + .await + .ok(); + + client + .create_function() + .function_name(&self.function_name) + .runtime(Runtime::Providedal2023) + .handler("bootstrap") + .role(&self.role_arn) + .code(FunctionCode::builder().zip_file(Blob::new(code)).build()) + .timeout(60) + .environment( + Environment::builder() + .set_variables(Some( + [ + ("NEON_ACCOUNT_ID", self.account_id.as_str()), + ("NEON_REGION", self.region.as_str()), + ("NEON_CLUSTER", self.cluster.as_str()), + ("NEON_S3_BUCKET_NAME", self.s3_bucket_name.as_str()), + ("NEON_S3_BUCKET_KEY", self.s3_bucket_key.as_str()), + ("AWS_LAMBDA_LOG_FORMAT", "JSON"), + ("AWS_LAMBDA_LOG_LEVEL", "INFO"), + ] + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(), + )) + .build(), + ) + .send() + .await?; + + Ok(()) + } +} + +struct VpcFlowLogs { + vpc_ids: Vec, + destination_s3_bucket_uri: String, +} + +impl VpcFlowLogs { + async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result> { + let ec2_client = aws_sdk_ec2::Client::new(sdk_config); + + let flow_logs = ec2_client + .create_flow_logs() + .resource_type(FlowLogsResourceType::Vpc) + .set_resource_ids(Some(self.vpc_ids.clone())) + .traffic_type(TrafficType::All) + .log_destination_type(LogDestinationType::S3) + .log_destination(&self.destination_s3_bucket_uri) + .destination_options( + DestinationOptionsRequest::builder() + .file_format(DestinationFileFormat::Parquet) + .hive_compatible_partitions(false) + .per_hour_partition(true) + .build(), + ) + .log_format("${region} ${az-id} ${vpc-id} ${flow-direction} ${pkt-srcaddr} ${pkt-dstaddr} ${srcport} ${dstport} ${start} ${bytes}") + .send() + .await?; + + if let Some(unsuccessful) = flow_logs + .unsuccessful + .as_ref() + .and_then(|v| if v.is_empty() { None } else { Some(v) }) + { + anyhow::bail!("VPC flow log creation unsuccessful: {unsuccessful:?}"); + } + + Ok(flow_logs.flow_log_ids().iter().cloned().collect()) + } +} + +struct GlueDatabase { + database_name: String, + pod_info_table_name: String, + pod_info_s3_bucket_uri: String, + vpc_flow_logs_table_name: String, + vpc_flow_logs_s3_bucket_uri: String, + results_table_name: String, + results_s3_bucket_uri: String, +} + +impl GlueDatabase { + async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> { + let glue_client = aws_sdk_glue::Client::new(sdk_config); + + let db = DatabaseInput::builder().name(&self.database_name).build()?; + + glue_client + .create_database() + .database_input(db.clone()) + .send() + .await?; + + let pod_info_columns = &[ + Column::builder() + .name("namespace") + .r#type("string") + .build()?, + Column::builder().name("name").r#type("string").build()?, + Column::builder().name("ip").r#type("string").build()?, + Column::builder() + .name("creation_time") + .r#type("timestamp") + .build()?, + Column::builder().name("node").r#type("string").build()?, + Column::builder().name("az").r#type("string").build()?, + ]; + glue_client + .create_table() + .database_name(db.name()) + .table_input( + TableInput::builder() + .name(&self.pod_info_table_name) + .storage_descriptor( + StorageDescriptor::builder() + .location(&self.pod_info_s3_bucket_uri) + .compressed(false) + .set_columns(Some(pod_info_columns.into_iter().cloned().collect())) + .input_format("org.apache.hadoop.mapred.TextInputFormat") + .output_format( + "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + ) + .serde_info( + SerDeInfo::builder() + .serialization_library( + "org.apache.hadoop.hive.serde2.OpenCSVSerde", + ) + .parameters("separatorChar", ",") + .parameters("quoteChar", "`") + .parameters("escapeChar", r"\") + .build(), + ) + .build(), + ) + .table_type("EXTERNAL_TABLE") + .parameters("classification", "csv") + .parameters("skip.header.line.count", "1") + .retention(0) + .build()?, + ) + .send() + .await?; + + let vpc_flow_logs_columns = &[ + Column::builder().name("region").r#type("string").build()?, + Column::builder().name("az_id").r#type("string").build()?, + Column::builder().name("vpc_id").r#type("string").build()?, + Column::builder() + .name("flow_direction") + .r#type("string") + .build()?, + Column::builder() + .name("pkt_srcaddr") + .r#type("string") + .build()?, + Column::builder() + .name("pkt_dstaddr") + .r#type("string") + .build()?, + Column::builder().name("srcport").r#type("int").build()?, + Column::builder().name("dstport").r#type("int").build()?, + Column::builder().name("start").r#type("bigint").build()?, + Column::builder().name("bytes").r#type("bigint").build()?, + ]; + glue_client + .create_table() + .database_name(db.name()) + .table_input( + TableInput::builder() + .name(&self.vpc_flow_logs_table_name) + .storage_descriptor( + StorageDescriptor::builder() + .location(&self.vpc_flow_logs_s3_bucket_uri) + .compressed(false) + .set_columns(Some(vpc_flow_logs_columns.into_iter().cloned().collect())) + .input_format( + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + ) + .output_format( + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", + ) + .serde_info( + SerDeInfo::builder() + .serialization_library( + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", + ) + .parameters("serialization.format", "1") + .build(), + ) + .build(), + ) + .table_type("EXTERNAL_TABLE") + .parameters("classification", "parquet") + .retention(0) + .build()?, + ) + .send() + .await?; + + let athena_results_columns = &[ + Column::builder().name("time").r#type("timestamp").build()?, + Column::builder().name("traffic").r#type("string").build()?, + Column::builder() + .name("total_bytes") + .r#type("bigint") + .build()?, + ]; + glue_client + .create_table() + .database_name(db.name()) + .table_input( + TableInput::builder() + .name(&self.results_table_name) + .storage_descriptor( + StorageDescriptor::builder() + .location(&self.results_s3_bucket_uri) + .compressed(false) + .set_columns(Some(athena_results_columns.into_iter().cloned().collect())) + .input_format( + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + ) + .output_format( + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", + ) + .serde_info( + SerDeInfo::builder() + .serialization_library( + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", + ) + .parameters("serialization.format", "1") + .build(), + ) + .build(), + ) + .table_type("EXTERNAL_TABLE") + .parameters("classification", "parquet") + .retention(0) + .build()?, + ) + .send() + .await?; + + Ok(()) + } +} + +struct AthenaQuery { + query_name: String, + glue_database: String, + invocation_frequency: usize, + athena_results_table_name: String, + vpc_flow_logs_table_name: String, + pod_info_table_name: String, +} + +impl AthenaQuery { + async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result { + let Self { + athena_results_table_name, + vpc_flow_logs_table_name, + pod_info_table_name, + invocation_frequency, + .. + } = self; + + let query_string = format!( + r#" +INSERT INTO "{athena_results_table_name}" +WITH + ip_addresses_and_az_mapping AS ( + SELECT + DISTINCT pkt_srcaddr AS ipaddress, + az_id + FROM "{vpc_flow_logs_table_name}" + WHERE flow_direction = 'egress' + AND from_unixtime("{vpc_flow_logs_table_name}".start) > (CURRENT_TIMESTAMP - ({invocation_frequency} * interval '1' minute)) + ), + egress_flows_of_pods_with_status AS ( + SELECT + "{pod_info_table_name}".name AS srcpodname, + pkt_srcaddr AS srcaddr, + pkt_dstaddr AS dstaddr, + "{vpc_flow_logs_table_name}".az_id AS srcazid, + bytes, + start + FROM "{vpc_flow_logs_table_name}" + INNER JOIN "{pod_info_table_name}" ON "{vpc_flow_logs_table_name}".pkt_srcaddr = "{pod_info_table_name}".ip + WHERE flow_direction = 'egress' + AND from_unixtime("{vpc_flow_logs_table_name}".start) > (CURRENT_TIMESTAMP - ({invocation_frequency} * interval '1' minute)) + ), + cross_az_traffic_by_pod AS ( + SELECT + srcaddr, + srcpodname, + dstaddr, + "{pod_info_table_name}".name AS dstpodname, + srcazid, + ip_addresses_and_az_mapping.az_id AS dstazid, + bytes, + start + FROM egress_flows_of_pods_with_status + INNER JOIN "{pod_info_table_name}" ON dstaddr = "{pod_info_table_name}".ip + LEFT JOIN ip_addresses_and_az_mapping ON dstaddr = ipaddress + WHERE ip_addresses_and_az_mapping.az_id != srcazid + ) +SELECT + date_trunc('MINUTE', from_unixtime(start)) AS time, + CONCAT(srcpodname, ' -> ', dstpodname) AS traffic, + SUM(bytes) AS total_bytes +FROM cross_az_traffic_by_pod +GROUP BY date_trunc('MINUTE', from_unixtime(start)), CONCAT(srcpodname, ' -> ', dstpodname) +ORDER BY time, total_bytes DESC +"# + ); + + let athena_client = aws_sdk_athena::Client::new(sdk_config); + let res = athena_client + .create_named_query() + .name(&self.query_name) + .database(&self.glue_database) + .query_string(query_string) + .send() + .await?; + + Ok(res.named_query_id.unwrap()) + } +} + +struct StateMachine { + name: String, + role_arn: String, + named_query_id: String, + glue_database: String, + lambda_function_name: String, + athena_results_s3_bucket_uri: String, + log_group_arn: String, +} + +impl StateMachine { + async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> { + let sfn_client = aws_sdk_sfn::Client::new(sdk_config); + sfn_client + .create_state_machine() + .name(&self.name) + .role_arn(&self.role_arn) + .logging_configuration( + LoggingConfiguration::builder() + .level(LogLevel::All) + .destinations( + LogDestination::builder() + .cloud_watch_logs_log_group( + CloudWatchLogsLogGroup::builder() + .log_group_arn(&self.log_group_arn) + .build(), + ) + .build(), + ) + .build(), + ) + .definition( + json!( + { + "StartAt": "Invoke", + "States": { + "Invoke": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Output": "{% $states.result.Payload %}", + "Arguments": { + "FunctionName": self.lambda_function_name, + "Payload": json!({ + "detail-type": "Scheduled Event", + "source": "aws.events", + "detail": {} + }).to_string() + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2, + "JitterStrategy": "FULL" + } + ], + "Next": "Check" + }, + "Check": { + "Type": "Choice", + "Choices": [ + { + "Next": "GetNamedQuery", + "Condition": "{% $states.input.statusCode = 200 %}" + } + ], + "Default": "Fail" + }, + "GetNamedQuery": { + "Type": "Task", + "Arguments": { + "NamedQueryId": self.named_query_id + }, + "Resource": "arn:aws:states:::aws-sdk:athena:getNamedQuery", + "Output": { + "QueryString": "{% $states.result.NamedQuery.QueryString %}" + }, + "Next": "StartQueryExecution" + }, + "StartQueryExecution": { + "Type": "Task", + "Resource": "arn:aws:states:::athena:startQueryExecution.sync", + "Arguments": { + "QueryString": "{% $states.input.QueryString %}", + "QueryExecutionContext": { + "Database": self.glue_database + }, + "ResultConfiguration": { + "OutputLocation": self.athena_results_s3_bucket_uri + }, + "WorkGroup": "primary" + }, + "End": true + }, + "Fail": { + "Type": "Fail" + } + }, + "QueryLanguage": "JSONata" + } + ) + .to_string(), + ) + .send() + .await?; + + Ok(()) + } +} + +struct Schedule { + name: String, + interval_minutes: usize, + target_state_machine_arn: String, + target_role_arn: String, + dead_letter_queue_arn: Option, +} + +impl Schedule { + async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> { + let sched_client = aws_sdk_scheduler::Client::new(sdk_config); + + sched_client + .create_schedule() + .name(&self.name) + .schedule_expression(format!("rate({} minute)", self.interval_minutes)) + .flexible_time_window( + FlexibleTimeWindow::builder() + .mode(FlexibleTimeWindowMode::Off) + .build()?, + ) + .target( + Target::builder() + .arn(&self.target_state_machine_arn) + .role_arn(&self.target_role_arn) + .input( + json!({ + "detail-type": "Scheduled Event", + "source": "aws.events", + "detail": {} + }) + .to_string(), + ) + .retry_policy( + RetryPolicy::builder() + .maximum_retry_attempts(0) + .maximum_event_age_in_seconds(60) + .build(), + ) + .set_dead_letter_config( + self.dead_letter_queue_arn + .as_ref() + .map(|arn| DeadLetterConfig::builder().arn(arn).build()), + ) + .build()?, + ) + .send() + .await?; + + Ok(()) + } +} + +struct KubernetesRoles { + region: String, + cluster: String, + k8s_role_prefix: String, + lambda_role_arn: String, +} + +impl KubernetesRoles { + fn print(&self) -> anyhow::Result<()> { + let Self { + region, + cluster, + k8s_role_prefix, + lambda_role_arn, + } = self; + + let yaml = format!( + r#" +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {k8s_role_prefix}-clusterrole +rules: +- apiGroups: + - "" + resources: ["nodes", "namespaces", "pods"] + verbs: ["get", "list"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: {k8s_role_prefix}-binding +subjects: +- kind: Group + name: {k8s_role_prefix}-group + apiGroup: rbac.authorization.k8s.io +roleRef: + kind: ClusterRole + name: {k8s_role_prefix}-clusterrole + apiGroup: rbac.authorization.k8s.io +"# + ); + + let eksctl = format!( + r#"eksctl create iamidentitymapping \ + --region "{region}" + --cluster "{cluster}" \ + --arn "{lambda_role_arn}" \ + --username "{k8s_role_prefix}-binding" \ + --group "{k8s_role_prefix}-group" +"# + ); + + Ok(()) + } +} diff --git a/lambda/pod_info_dumper/src/lib.rs b/lambda/pod_info_dumper/src/lib.rs index 0bbccf6802..6451791759 100644 --- a/lambda/pod_info_dumper/src/lib.rs +++ b/lambda/pod_info_dumper/src/lib.rs @@ -25,7 +25,6 @@ use serde::ser::SerializeMap; use sha2::{Digest as _, Sha256}; const AZ_LABEL: &str = "topology.kubernetes.io/zone"; -const CSV_FILE_S3_KEY: &str = "lambda/pod_info_dumper/pod_info.csv"; #[derive(Debug)] struct Config { @@ -38,6 +37,7 @@ struct Config { struct S3BucketConfig { region: String, name: String, + key: String, } impl S3BucketConfig { @@ -92,14 +92,15 @@ pub async fn start() -> Result<(), Error> { tracing::info!("function handler started"); let config = Config { - aws_account_id: env::var("NEON_LAMBDA_AWS_ACCOUNT_ID")?, + aws_account_id: env::var("NEON_ACCOUNT_ID")?, s3_bucket: S3BucketConfig { - region: env::var("NEON_LAMBDA_TARGET_S3_BUCKET_REGION")?, - name: env::var("NEON_LAMBDA_TARGET_S3_BUCKET_NAME")?, + region: env::var("NEON_REGION")?, + name: env::var("NEON_S3_BUCKET_NAME")?, + key: env::var("NEON_S3_BUCKET_KEY")?, }, eks_cluster: EksClusterConfig { - region: env::var("NEON_LAMBDA_TARGET_EKS_CLUSTER_REGION")?, - name: env::var("NEON_LAMBDA_TARGET_EKS_CLUSTER_NAME")?, + region: env::var("NEON_REGION")?, + name: env::var("NEON_CLUSTER")?, }, }; @@ -406,7 +407,7 @@ async fn upload_csv( let resp = s3_client .put_object() .bucket(&config.s3_bucket.name) - .key(CSV_FILE_S3_KEY) + .key(&config.s3_bucket.key) .content_type("text/csv") .checksum_algorithm(ChecksumAlgorithm::Sha256) .checksum_sha256(STANDARD.encode(csum))