CREATE TABLE ngx_access_log ( ip_address STRING, http_method STRING, request STRING, status_code INT16, body_bytes_sent INT32, user_agent STRING, response_size INT32, ts TIMESTAMP TIME INDEX, PRIMARY KEY (ip_address, http_method, user_agent, status_code)) WITH ('append_mode'='true'); Affected Rows: 0 CREATE TABLE user_agent_statistics ( user_agent STRING, total_count INT64, update_at TIMESTAMP, __ts_placeholder TIMESTAMP TIME INDEX, PRIMARY KEY (user_agent)); Affected Rows: 0 INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z'); Affected Rows: 4 -- SQLNESS SLEEP 1s INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z'); Affected Rows: 4 -- SQLNESS SLEEP 1s INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z'); Affected Rows: 4 CREATE FLOW user_agent_flow SINK TO user_agent_statistics AS SELECT user_agent, COUNT(user_agent) AS total_count FROM ngx_access_log GROUP BY user_agent; Affected Rows: 0 SELECT created_time <= updated_time, created_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow'; +--------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------+ | information_schema.flows.created_time <= information_schema.flows.updated_time | information_schema.flows.created_time IS NOT NULL | source_table_names | +--------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------+ | true | true | greptime.public.ngx_access_log | +--------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------+ CREATE OR REPLACE FLOW user_agent_flow SINK TO user_agent_statistics AS SELECT user_agent, COUNT(user_agent)+123 AS total_count FROM ngx_access_log GROUP BY user_agent; Affected Rows: 0 SELECT created_time < updated_time, created_time IS NOT NULL, updated_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow'; +-------------------------------------------------------------------------------+---------------------------------------------------+---------------------------------------------------+--------------------------------+ | information_schema.flows.created_time < information_schema.flows.updated_time | information_schema.flows.created_time IS NOT NULL | information_schema.flows.updated_time IS NOT NULL | source_table_names | +-------------------------------------------------------------------------------+---------------------------------------------------+---------------------------------------------------+--------------------------------+ | true | true | true | greptime.public.ngx_access_log | +-------------------------------------------------------------------------------+---------------------------------------------------+---------------------------------------------------+--------------------------------+ -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('user_agent_flow'); +-------------------------------------+ | ADMIN FLUSH_FLOW('user_agent_flow') | +-------------------------------------+ | FLOW_FLUSHED | +-------------------------------------+ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z'); Affected Rows: 4 -- SQLNESS SLEEP 1s INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z'); Affected Rows: 4 -- SQLNESS SLEEP 1s INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z'); Affected Rows: 4 -- SQLNESS SLEEP 10s INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z'); Affected Rows: 4 -- TODO(discord9): fix flow stat update for batching mode flow SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow'; +--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+ | information_schema.flows.created_time < information_schema.flows.last_execution_time | information_schema.flows.created_time IS NOT NULL | information_schema.flows.last_execution_time IS NOT NULL | source_table_names | +--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+ | | true | false | greptime.public.ngx_access_log | +--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+ DROP TABLE ngx_access_log; Affected Rows: 0 DROP TABLE user_agent_statistics; Affected Rows: 0 DROP FLOW user_agent_flow; Affected Rows: 0