diff --git a/src/common/function/src/scalars/aggregate.rs b/src/common/function/src/scalars/aggregate.rs index c6875a680c..f7ce316b6c 100644 --- a/src/common/function/src/scalars/aggregate.rs +++ b/src/common/function/src/scalars/aggregate.rs @@ -31,6 +31,7 @@ pub use polyval::PolyvalAccumulatorCreator; pub use scipy_stats_norm_cdf::ScipyStatsNormCdfAccumulatorCreator; pub use scipy_stats_norm_pdf::ScipyStatsNormPdfAccumulatorCreator; +use super::geo::encoding::JsonPathEncodeFunctionCreator; use crate::function_registry::FunctionRegistry; /// A function creates `AggregateFunctionCreator`. @@ -91,5 +92,7 @@ impl AggregateFunctions { register_aggr_func!("argmin", 1, ArgminAccumulatorCreator); register_aggr_func!("scipystatsnormcdf", 2, ScipyStatsNormCdfAccumulatorCreator); register_aggr_func!("scipystatsnormpdf", 2, ScipyStatsNormPdfAccumulatorCreator); + + register_aggr_func!("json_encode_path", 3, JsonPathEncodeFunctionCreator); } } diff --git a/src/common/function/src/scalars/aggregate/argmax.rs b/src/common/function/src/scalars/aggregate/argmax.rs index c5c5264f19..4749ff9a3a 100644 --- a/src/common/function/src/scalars/aggregate/argmax.rs +++ b/src/common/function/src/scalars/aggregate/argmax.rs @@ -16,7 +16,10 @@ use std::cmp::Ordering; use std::sync::Arc; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; -use common_query::error::{BadAccumulatorImplSnafu, CreateAccumulatorSnafu, Result}; +use common_query::error::{ + BadAccumulatorImplSnafu, CreateAccumulatorSnafu, InvalidInputStateSnafu, Result, +}; +use common_query::logical_plan::accumulator::AggrFuncTypeStore; use common_query::logical_plan::{Accumulator, AggregateFunctionCreator}; use common_query::prelude::*; use datatypes::prelude::*; diff --git a/src/common/function/src/scalars/aggregate/argmin.rs b/src/common/function/src/scalars/aggregate/argmin.rs index 7233f43b77..fe89184460 100644 --- a/src/common/function/src/scalars/aggregate/argmin.rs +++ b/src/common/function/src/scalars/aggregate/argmin.rs @@ -16,7 +16,10 @@ use std::cmp::Ordering; use std::sync::Arc; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; -use common_query::error::{BadAccumulatorImplSnafu, CreateAccumulatorSnafu, Result}; +use common_query::error::{ + BadAccumulatorImplSnafu, CreateAccumulatorSnafu, InvalidInputStateSnafu, Result, +}; +use common_query::logical_plan::accumulator::AggrFuncTypeStore; use common_query::logical_plan::{Accumulator, AggregateFunctionCreator}; use common_query::prelude::*; use datatypes::prelude::*; diff --git a/src/common/function/src/scalars/aggregate/diff.rs b/src/common/function/src/scalars/aggregate/diff.rs index b83ed6d004..25d1614e4b 100644 --- a/src/common/function/src/scalars/aggregate/diff.rs +++ b/src/common/function/src/scalars/aggregate/diff.rs @@ -17,8 +17,10 @@ use std::sync::Arc; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_query::error::{ - CreateAccumulatorSnafu, DowncastVectorSnafu, FromScalarValueSnafu, Result, + CreateAccumulatorSnafu, DowncastVectorSnafu, FromScalarValueSnafu, InvalidInputStateSnafu, + Result, }; +use common_query::logical_plan::accumulator::AggrFuncTypeStore; use common_query::logical_plan::{Accumulator, AggregateFunctionCreator}; use common_query::prelude::*; use datatypes::prelude::*; diff --git a/src/common/function/src/scalars/aggregate/mean.rs b/src/common/function/src/scalars/aggregate/mean.rs index 3dc3e18535..ed66c90bdb 100644 --- a/src/common/function/src/scalars/aggregate/mean.rs +++ b/src/common/function/src/scalars/aggregate/mean.rs @@ -17,8 +17,10 @@ use std::sync::Arc; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_query::error::{ - BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu, Result, + BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu, InvalidInputStateSnafu, + Result, }; +use common_query::logical_plan::accumulator::AggrFuncTypeStore; use common_query::logical_plan::{Accumulator, AggregateFunctionCreator}; use common_query::prelude::*; use datatypes::prelude::*; diff --git a/src/common/function/src/scalars/aggregate/polyval.rs b/src/common/function/src/scalars/aggregate/polyval.rs index ae6ca101c4..bc3986fd0e 100644 --- a/src/common/function/src/scalars/aggregate/polyval.rs +++ b/src/common/function/src/scalars/aggregate/polyval.rs @@ -18,8 +18,9 @@ use std::sync::Arc; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_query::error::{ self, BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu, - FromScalarValueSnafu, InvalidInputColSnafu, Result, + FromScalarValueSnafu, InvalidInputColSnafu, InvalidInputStateSnafu, Result, }; +use common_query::logical_plan::accumulator::AggrFuncTypeStore; use common_query::logical_plan::{Accumulator, AggregateFunctionCreator}; use common_query::prelude::*; use datatypes::prelude::*; diff --git a/src/common/function/src/scalars/aggregate/scipy_stats_norm_cdf.rs b/src/common/function/src/scalars/aggregate/scipy_stats_norm_cdf.rs index e6c92225a6..09a9c820d8 100644 --- a/src/common/function/src/scalars/aggregate/scipy_stats_norm_cdf.rs +++ b/src/common/function/src/scalars/aggregate/scipy_stats_norm_cdf.rs @@ -17,8 +17,10 @@ use std::sync::Arc; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_query::error::{ self, BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu, - FromScalarValueSnafu, GenerateFunctionSnafu, InvalidInputColSnafu, Result, + FromScalarValueSnafu, GenerateFunctionSnafu, InvalidInputColSnafu, InvalidInputStateSnafu, + Result, }; +use common_query::logical_plan::accumulator::AggrFuncTypeStore; use common_query::logical_plan::{Accumulator, AggregateFunctionCreator}; use common_query::prelude::*; use datatypes::prelude::*; diff --git a/src/common/function/src/scalars/aggregate/scipy_stats_norm_pdf.rs b/src/common/function/src/scalars/aggregate/scipy_stats_norm_pdf.rs index 3045ae8665..2d5025ea3a 100644 --- a/src/common/function/src/scalars/aggregate/scipy_stats_norm_pdf.rs +++ b/src/common/function/src/scalars/aggregate/scipy_stats_norm_pdf.rs @@ -17,8 +17,10 @@ use std::sync::Arc; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_query::error::{ self, BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu, - FromScalarValueSnafu, GenerateFunctionSnafu, InvalidInputColSnafu, Result, + FromScalarValueSnafu, GenerateFunctionSnafu, InvalidInputColSnafu, InvalidInputStateSnafu, + Result, }; +use common_query::logical_plan::accumulator::AggrFuncTypeStore; use common_query::logical_plan::{Accumulator, AggregateFunctionCreator}; use common_query::prelude::*; use datatypes::prelude::*; diff --git a/src/common/function/src/scalars/geo.rs b/src/common/function/src/scalars/geo.rs index 8ad6a7aef2..37b6c0704b 100644 --- a/src/common/function/src/scalars/geo.rs +++ b/src/common/function/src/scalars/geo.rs @@ -13,8 +13,10 @@ // limitations under the License. use std::sync::Arc; +pub(crate) mod encoding; mod geohash; mod h3; +mod helpers; use geohash::{GeohashFunction, GeohashNeighboursFunction}; diff --git a/src/common/function/src/scalars/geo/encoding.rs b/src/common/function/src/scalars/geo/encoding.rs new file mode 100644 index 0000000000..0600120598 --- /dev/null +++ b/src/common/function/src/scalars/geo/encoding.rs @@ -0,0 +1,223 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_error::ext::{BoxedError, PlainError}; +use common_error::status_code::StatusCode; +use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; +use common_query::error::{self, InvalidFuncArgsSnafu, InvalidInputStateSnafu, Result}; +use common_query::logical_plan::accumulator::AggrFuncTypeStore; +use common_query::logical_plan::{Accumulator, AggregateFunctionCreator}; +use common_query::prelude::AccumulatorCreatorFunction; +use common_time::Timestamp; +use datatypes::prelude::ConcreteDataType; +use datatypes::value::{ListValue, Value}; +use datatypes::vectors::VectorRef; +use snafu::{ensure, ResultExt}; + +use super::helpers::{ensure_columns_len, ensure_columns_n}; + +/// Accumulator of lat, lng, timestamp tuples +#[derive(Debug)] +pub struct JsonPathAccumulator { + timestamp_type: ConcreteDataType, + lat: Vec>, + lng: Vec>, + timestamp: Vec>, +} + +impl JsonPathAccumulator { + fn new(timestamp_type: ConcreteDataType) -> Self { + Self { + lat: Vec::default(), + lng: Vec::default(), + timestamp: Vec::default(), + timestamp_type, + } + } +} + +impl Accumulator for JsonPathAccumulator { + fn state(&self) -> Result> { + Ok(vec![ + Value::List(ListValue::new( + self.lat.iter().map(|i| Value::from(*i)).collect(), + ConcreteDataType::float64_datatype(), + )), + Value::List(ListValue::new( + self.lng.iter().map(|i| Value::from(*i)).collect(), + ConcreteDataType::float64_datatype(), + )), + Value::List(ListValue::new( + self.timestamp.iter().map(|i| Value::from(*i)).collect(), + self.timestamp_type.clone(), + )), + ]) + } + + fn update_batch(&mut self, columns: &[VectorRef]) -> Result<()> { + // update batch as in datafusion just provides the accumulator original + // input. + // + // columns is vec of [`lat`, `lng`, `timestamp`] + // where + // - `lat` is a vector of `Value::Float64` or similar type. Each item in + // the vector is a row in given dataset. + // - so on so forth for `lng` and `timestamp` + ensure_columns_n!(columns, 3); + + let lat = &columns[0]; + let lng = &columns[1]; + let ts = &columns[2]; + + let size = lat.len(); + + for idx in 0..size { + self.lat.push(lat.get(idx).as_f64_lossy()); + self.lng.push(lng.get(idx).as_f64_lossy()); + self.timestamp.push(ts.get(idx).as_timestamp()); + } + + Ok(()) + } + + fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> { + // merge batch as in datafusion gives state accumulated from the data + // returned from child accumulators' state() call + // In our particular implementation, the data structure is like + // + // states is vec of [`lat`, `lng`, `timestamp`] + // where + // - `lat` is a vector of `Value::List`. Each item in the list is all + // coordinates from a child accumulator. + // - so on so forth for `lng` and `timestamp` + + ensure_columns_n!(states, 3); + + let lat_lists = &states[0]; + let lng_lists = &states[1]; + let ts_lists = &states[2]; + + let len = lat_lists.len(); + + for idx in 0..len { + if let Some(lat_list) = lat_lists + .get(idx) + .as_list() + .map_err(BoxedError::new) + .context(error::ExecuteSnafu)? + { + for v in lat_list.items() { + self.lat.push(v.as_f64_lossy()); + } + } + + if let Some(lng_list) = lng_lists + .get(idx) + .as_list() + .map_err(BoxedError::new) + .context(error::ExecuteSnafu)? + { + for v in lng_list.items() { + self.lng.push(v.as_f64_lossy()); + } + } + + if let Some(ts_list) = ts_lists + .get(idx) + .as_list() + .map_err(BoxedError::new) + .context(error::ExecuteSnafu)? + { + for v in ts_list.items() { + self.timestamp.push(v.as_timestamp()); + } + } + } + + Ok(()) + } + + fn evaluate(&self) -> Result { + let mut work_vec: Vec<(&Option, &Option, &Option)> = self + .lat + .iter() + .zip(self.lng.iter()) + .zip(self.timestamp.iter()) + .map(|((a, b), c)| (a, b, c)) + .collect(); + + // sort by timestamp, we treat null timestamp as 0 + work_vec.sort_unstable_by_key(|tuple| tuple.2.unwrap_or_else(|| Timestamp::new_second(0))); + + let result = serde_json::to_string( + &work_vec + .into_iter() + // note that we transform to lng,lat for geojson compatibility + .map(|(lat, lng, _)| vec![lng, lat]) + .collect::>>>(), + ) + .map_err(|e| { + BoxedError::new(PlainError::new( + format!("Serialization failure: {}", e), + StatusCode::EngineExecuteQuery, + )) + }) + .context(error::ExecuteSnafu)?; + + Ok(Value::String(result.into())) + } +} + +/// This function accept rows of lat, lng and timestamp, sort with timestamp and +/// encoding them into a geojson-like path. +/// +/// Example: +/// +/// ```sql +/// SELECT json_encode_path(lat, lon, timestamp) FROM table [group by ...]; +/// ``` +/// +#[as_aggr_func_creator] +#[derive(Debug, Default, AggrFuncTypeStore)] +pub struct JsonPathEncodeFunctionCreator {} + +impl AggregateFunctionCreator for JsonPathEncodeFunctionCreator { + fn creator(&self) -> AccumulatorCreatorFunction { + let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| { + let ts_type = types[2].clone(); + Ok(Box::new(JsonPathAccumulator::new(ts_type))) + }); + + creator + } + + fn output_type(&self) -> Result { + Ok(ConcreteDataType::string_datatype()) + } + + fn state_types(&self) -> Result> { + let input_types = self.input_types()?; + ensure!(input_types.len() == 3, InvalidInputStateSnafu); + + let timestamp_type = input_types[2].clone(); + + Ok(vec![ + ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()), + ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()), + ConcreteDataType::list_datatype(timestamp_type), + ]) + } +} diff --git a/src/common/function/src/scalars/geo/helpers.rs b/src/common/function/src/scalars/geo/helpers.rs new file mode 100644 index 0000000000..f07eaefb15 --- /dev/null +++ b/src/common/function/src/scalars/geo/helpers.rs @@ -0,0 +1,61 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +macro_rules! ensure_columns_len { + ($columns:ident) => { + ensure!( + $columns.windows(2).all(|c| c[0].len() == c[1].len()), + InvalidFuncArgsSnafu { + err_msg: "The length of input columns are in different size" + } + ) + }; + ($column_a:ident, $column_b:ident, $($column_n:ident),*) => { + ensure!( + { + let mut result = $column_a.len() == $column_b.len(); + $( + result = result && ($column_a.len() == $column_n.len()); + )* + result + } + InvalidFuncArgsSnafu { + err_msg: "The length of input columns are in different size" + } + ) + }; +} + +pub(super) use ensure_columns_len; + +macro_rules! ensure_columns_n { + ($columns:ident, $n:literal) => { + ensure!( + $columns.len() == $n, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of arguments is not correct, expect {}, provided : {}", + stringify!($n), + $columns.len() + ), + } + ); + + if $n > 1 { + ensure_columns_len!($columns); + } + }; +} + +pub(super) use ensure_columns_n; diff --git a/src/common/macro/src/aggr_func.rs b/src/common/macro/src/aggr_func.rs index 4c3ccccdee..2e17e70b5a 100644 --- a/src/common/macro/src/aggr_func.rs +++ b/src/common/macro/src/aggr_func.rs @@ -21,23 +21,19 @@ use syn::{parse_macro_input, DeriveInput, ItemStruct}; pub(crate) fn impl_aggr_func_type_store(ast: &DeriveInput) -> TokenStream { let name = &ast.ident; let gen = quote! { - use common_query::logical_plan::accumulator::AggrFuncTypeStore; - use common_query::error::{InvalidInputStateSnafu, Error as QueryError}; - use datatypes::prelude::ConcreteDataType; - - impl AggrFuncTypeStore for #name { - fn input_types(&self) -> std::result::Result, QueryError> { + impl common_query::logical_plan::accumulator::AggrFuncTypeStore for #name { + fn input_types(&self) -> std::result::Result, common_query::error::Error> { let input_types = self.input_types.load(); - snafu::ensure!(input_types.is_some(), InvalidInputStateSnafu); + snafu::ensure!(input_types.is_some(), common_query::error::InvalidInputStateSnafu); Ok(input_types.as_ref().unwrap().as_ref().clone()) } - fn set_input_types(&self, input_types: Vec) -> std::result::Result<(), QueryError> { + fn set_input_types(&self, input_types: Vec) -> std::result::Result<(), common_query::error::Error> { let old = self.input_types.swap(Some(std::sync::Arc::new(input_types.clone()))); if let Some(old) = old { - snafu::ensure!(old.len() == input_types.len(), InvalidInputStateSnafu); + snafu::ensure!(old.len() == input_types.len(), common_query::error::InvalidInputStateSnafu); for (x, y) in old.iter().zip(input_types.iter()) { - snafu::ensure!(x == y, InvalidInputStateSnafu); + snafu::ensure!(x == y, common_query::error::InvalidInputStateSnafu); } } Ok(()) @@ -51,7 +47,7 @@ pub(crate) fn impl_as_aggr_func_creator(_args: TokenStream, input: TokenStream) let mut item_struct = parse_macro_input!(input as ItemStruct); if let syn::Fields::Named(ref mut fields) = item_struct.fields { let result = syn::Field::parse_named.parse2(quote! { - input_types: arc_swap::ArcSwapOption> + input_types: arc_swap::ArcSwapOption> }); match result { Ok(field) => fields.named.push(field), diff --git a/src/common/macro/tests/test_derive.rs b/src/common/macro/tests/test_derive.rs index 9c648c788d..c614800572 100644 --- a/src/common/macro/tests/test_derive.rs +++ b/src/common/macro/tests/test_derive.rs @@ -24,5 +24,5 @@ struct Foo {} fn test_derive() { let _ = Foo::default(); assert_fields!(Foo: input_types); - assert_impl_all!(Foo: std::fmt::Debug, Default, AggrFuncTypeStore); + assert_impl_all!(Foo: std::fmt::Debug, Default, common_query::logical_plan::accumulator::AggrFuncTypeStore); } diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 495c47dc5f..12f5c8d1f2 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -249,6 +249,15 @@ impl ConcreteDataType { ] } + pub fn timestamps() -> Vec { + vec![ + ConcreteDataType::timestamp_second_datatype(), + ConcreteDataType::timestamp_millisecond_datatype(), + ConcreteDataType::timestamp_microsecond_datatype(), + ConcreteDataType::timestamp_nanosecond_datatype(), + ] + } + /// Convert arrow data type to [ConcreteDataType]. /// /// # Panics diff --git a/src/query/src/tests/my_sum_udaf_example.rs b/src/query/src/tests/my_sum_udaf_example.rs index 8220dcf72d..63653ffbce 100644 --- a/src/query/src/tests/my_sum_udaf_example.rs +++ b/src/query/src/tests/my_sum_udaf_example.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use common_function::scalars::aggregate::AggregateFunctionMeta; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_query::error::{CreateAccumulatorSnafu, Result as QueryResult}; +use common_query::logical_plan::accumulator::AggrFuncTypeStore; use common_query::logical_plan::{Accumulator, AggregateFunctionCreator}; use common_query::prelude::*; use common_recordbatch::{RecordBatch, RecordBatches}; diff --git a/tests/cases/standalone/common/function/geo.result b/tests/cases/standalone/common/function/geo.result index 8954447650..4f9d168ac0 100644 --- a/tests/cases/standalone/common/function/geo.result +++ b/tests/cases/standalone/common/function/geo.result @@ -236,3 +236,28 @@ SELECT geohash_neighbours(37.76938, -122.3889, 11); | [9q8yygxnefv, 9q8yygxnefu, 9q8yygxnefs, 9q8yygxnefk, 9q8yygxnefm, 9q8yygxnefq, 9q8yygxnefw, 9q8yygxnefy] | +----------------------------------------------------------------------------------------------------------+ +SELECT json_encode_path(37.76938, -122.3889, 1728083375::TimestampSecond); + ++----------------------------------------------------------------------------------------------------------------------+ +| json_encode_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(Second, None)"))) | ++----------------------------------------------------------------------------------------------------------------------+ +| [[-122.3889,37.76938]] | ++----------------------------------------------------------------------------------------------------------------------+ + +SELECT json_encode_path(lat, lon, ts) +FROM( + SELECT 37.76938 AS lat, -122.3889 AS lon, 1728083375::TimestampSecond AS ts + UNION ALL + SELECT 37.76928 AS lat, -122.3839 AS lon, 1728083373::TimestampSecond AS ts + UNION ALL + SELECT 37.76930 AS lat, -122.3820 AS lon, 1728083379::TimestampSecond AS ts + UNION ALL + SELECT 37.77001 AS lat, -122.3888 AS lon, 1728083372::TimestampSecond AS ts +); + ++-------------------------------------------------------------------------------------+ +| json_encode_path(lat,lon,ts) | ++-------------------------------------------------------------------------------------+ +| [[-122.3888,37.77001],[-122.3839,37.76928],[-122.3889,37.76938],[-122.382,37.7693]] | ++-------------------------------------------------------------------------------------+ + diff --git a/tests/cases/standalone/common/function/geo.sql b/tests/cases/standalone/common/function/geo.sql index be2b3947bb..cd9d403e6e 100644 --- a/tests/cases/standalone/common/function/geo.sql +++ b/tests/cases/standalone/common/function/geo.sql @@ -66,3 +66,16 @@ SELECT geohash(37.76938, -122.3889, 11::UInt32); SELECT geohash(37.76938, -122.3889, 11::UInt64); SELECT geohash_neighbours(37.76938, -122.3889, 11); + +SELECT json_encode_path(37.76938, -122.3889, 1728083375::TimestampSecond); + +SELECT json_encode_path(lat, lon, ts) +FROM( + SELECT 37.76938 AS lat, -122.3889 AS lon, 1728083375::TimestampSecond AS ts + UNION ALL + SELECT 37.76928 AS lat, -122.3839 AS lon, 1728083373::TimestampSecond AS ts + UNION ALL + SELECT 37.76930 AS lat, -122.3820 AS lon, 1728083379::TimestampSecond AS ts + UNION ALL + SELECT 37.77001 AS lat, -122.3888 AS lon, 1728083372::TimestampSecond AS ts +);