YMatrix is suitable for IoT timing scenarios of devices of all sizes. This tutorial takes a specific Internet of Vehicle timing scenario as an example to illustrate the method of loading, processing and querying timing data in YMatrix.
A city has a population of more than 8 million and 200,000 taxis. The municipal administration has collected and disclosed the itinerary of each taxi, including the time of boarding and getting off, the place of boarding and getting off, the number of people riding, the fees and payment methods. With the help of this information, what can be analyzed? The answer is: taxi utilization rate, and even overall traffic conditions! By doing this, You can improve city management and improve the life experience of residents and tourists. This tutorial provides a month's data compression package. Click [here] (https://pan.baidu.com/s/1JJv6ADN5vHOlem7smTrEDw) to start your urban traffic management journey (extraction code 1x4u).
Among the data you collect, there is a way to pay. The possible ways to pay include cash, credit card, free payment, controversial, unknown and invalid. We call it static attributes. Save this information by creating a payment_types
table so that it can be associated with these metadata during subsequent queries. The attribute information associated with "paid method" is small in scale and has an update requirement, so just specify the default storage engine HEAP. Generally, the HEAP engine will be loaded by default without special specification.
=# CREATE TABLE IF NOT EXISTS payment_types (
payment_type int,
description text
)
USING HEAP;
Using the IF NOT EXISTS
statement, you can avoid repeated errors when creating tables with the same name.
=# INSERT INTO payment_types VALUES
(1, ‘Credit card’),
(2, ‘Cash’),
(3, ‘No charge’),
(4, ‘Disputed’),
(5, ‘Unknown’),
(6, ‘Invalid trip’);
There is also a type of rate indicating the price, including standard rate, Airport No. 1, Airport No. 2, special areas, negotiated prices, multiple people taking buses, etc. Similarly, you can create a static attribute table rate_codes
to record this information, and also run the default HEAP storage engine:
=# CREATE TABLE IF NOT EXISTS rate_codes (
rate_code int,
description text
)
USING HEAP;
=# INSERT INTO rate_codes VALUES
(1, ‘Standard Rate’),
(2, ‘Airport 1’),
(3, ‘Airport 2’),
(4, ‘Special Area’),
(5, ‘Negotiated Price’),
(6, ‘Group’);
Next you can create a time series data table to save specific itinerary data. There are some fields with brief description of the meanings. pickup_datetime
/ dropoff_datetime
represents the time point of boarding and time point of boarding respectively. pickup_longitude
/ pickup_latitude
represents the latitude and longitude value of boarding, dropoff_longitude
/ dropoff_latitude
represents the latitude and longitude value of boarding, passenger_count
represents the number of passengers, trip_distance
is the distance of the journey (in miles), total_amount
represents the ride fee, and the last field trip_duration
is a field generated when data is loaded, recording the ride duration (in minutes).
In the timing scenario of taking a taxi, the metadata in the trip
table is the timing data generated by the change of the device over time. Because data sources are diverse and varied in timing scenarios, high requirements are put forward for writing and storing timing data, while the demand for updating and deleting data is low. Therefore, MARS3 is the best choice, and its timing data writing, storage and query performance is significantly optimized.
The MARS3 table depends on the matrixts
extension, so before creating a table, you need to first create an extension in the library using the storage engine. If it has been created, there is no need to create it repeatedly, just skip this step:
=# CREATE EXTENSION matrixts;
When creating tables, you should use the USING MARS3
statement to specify the storage engine and use the WITH
statement to set the corresponding parameters. Where compresstype
represents the compression algorithm, supports zstd
, zlib
, and lz4
, and the default value is lz4
; compresslevel
represents the compression level. The smaller the value, the higher the compression ratio; the moderate value, the compression speed and compression ratio are relatively balanced. Different algorithms have different valid values ranges:
=# CREATE TABLE IF NOT EXISTS trip (
vendor_id text,
pickup_datetime timestamp without time zone,
dropoff_datetime timestamp without time zone,
passenger_count int,
trip_distance numeric,
pickup_longitude numeric,
pickup_latitude numeric,
rate_code_id int,
store_and_fwd_flag text,
dropoff_longitude numeric,
dropoff_latitude numeric,
payment_type int,
fare_amount numeric,
extra numeric,
mta_tax numeric,
tip_amount numeric,
tolls_amount numeric,
improvement_surcharge numeric,
total_amount numeric,
trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
)
USING MARS3
WITH (compresstype='lz4', compresslevel=1)
DISTRIBUTED BY (vendor_id)
ORDER BY (vendor_id, pickup_datetime)
PARTITION BY RANGE (pickup_datetime)
( START (date '2016-01-01') INCLUSIVE
END (date '2016-02-01') EXCLUSIVE
EVERY (INTERVAL '1 day') );
DISTRIBUTED BY
statement means that you will bucket the data in the trip
table and implement hash distribution according to the vendor_id
column so that data with the same value is on the same node.(vendor_id, pickup_datetime)
sort key, so that the data is stored in an orderly manner.PARTITION BY
statement is a data partition. From January 1, 2016 (including today's day) to February 1, 2016 (excluding today's day), 31 partition tables are created at one-day intervals. The trip
table is automatically partitioned by day, which facilitates quick cropping by time period query, and is also conducive to the rapid processing of expired data in the future.Notes!
SQL syntax speculates thatDISTRIBUTED BY
must be specified first, and thenPARTITION BY
must be specified. The actual execution is to doDISTRIBUTED BY
first to distribute the data to the corresponding node, and then executePARTITION BY
on the specified node to insert the data into the corresponding subpartition table.
You can find the yellow_tripdata_2016-01.csv
file path downloaded from the above link and then use the mxgate
command to load the data. Specify the actual file path after tail
, and specify the actual node name or IP through the --db-master-host
parameter, for example:
$ tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256 --delimiter ',' --exclude-columns trip_duration
The main parameters of mxgate are as follows:
--db-database postgres // Specify the database name
--db-master-host mdw // master node host name or IP
--db-master-port 5432 // Database port
--db-user mxadmin // Database username
--time-format raw // original format, no conversion
--target trip // The table name to be imported
--parallel 256 // Parallel number
--delimiter ',' // delimiter
For more information about mxgate, please see MatrixGate
YMatrix provides the time_bucket
function, which supports segmentation calculations according to any time interval. Before use, you also need to install the matrixts
extension on the database to initialize the timing components without repeated creation.
=# CREATE EXTENSION matrixts;
Then you can use the following SQL statement to count how many itineeraries are there every day:
=# SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*)
from trip
GROUP BY day
ORDER BY day;
If you want to know how many people are traveling in each hour of the day on January 2, 2016, you can use the following SQL:
=# SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;
It is quickly known through the max
and min
functions that the farthest distance in the current dataset is 485.9 miles. If you want to further understand the total number of trips in different trip distances above 10, 10-50, 50-100, 100-200, or 200, you only need one SQL statement to complete:
=# SELECT distance_range, count(*) AS num_of_trips
FROM
(
SELECT
CASE
WHEN trip_distance <= 10 THEN 10
WHEN trip_distance > 10 AND trip_distance <= 50 THEN 50
WHEN trip_distance > 50 AND trip_distance <= 100 THEN 100
WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
WHEN trip_distance > 200 THEN 500
END AS distance_range
FROM trip
) AS temp
GROUP BY distance_range;
After executing, you will be done:
distance_range | num_of_trips
----------------------------------------------------------------------------------------------------------------------------------
10 | 10308767
50 | 586200
100 | 379
200 | 58
500 | 9