MatrixDB is suitable for IoT timing scenarios of devices of all sizes. This tutorial takes a specific Internet of Vehicle timing scenario as an example to illustrate the method of loading, processing and querying timing data in MatrixDB.
A city has a population of more than 8 million and 200,000 taxis. The municipal management department has collected and disclosed the itinerary of each taxi, including the time of boarding and getting off, the place of boarding and getting off, the number of people riding, the fees and payment methods. With the help of this information, what can be analyzed? The answer is: taxi utilization rate, and even overall traffic conditions! By doing This, you can improve city management and improve the life experience of residents and tourists. This tutorial provides a month's data compression package. Click [here] (https://pan.baidu.com/s/1JJv6ADN5vHOlem7smTrEDw) to start your urban traffic management journey (extraction code 1x4u).
Among the data you collect, there is a way to pay. The possible ways to pay include cash, credit card, free payment, controversial, unknown and invalid. We call it static attributes. This information is saved by creating a payment_types table so that it can be associated with these metadata during subsequent queries. The attribute information associated with "paid method" is small in scale and has an update requirement, so You can specify the default storage engine HEAP. Generally, the HEAP engine will be loaded by default without special specification.
CREATE TABLE IF NOT EXISTS payment_types (
payment_type int,
description text
)
USING heap;
Using the IF NOT EXISTS statement, you can avoid repeated errors when creating tables with the same name.
INSERT INTO payment_types VALUES
(1, ‘Credit card’),
(2, ‘Cash’),
(3, ‘No charge’),
(4, ‘Disputed’),
(5, ‘Unknown’),
(6, ‘Invalid trip’);
There is also a type of rate indicating the type, including standard rate, Airport No. 1, Airport No. 2, special areas, negotiated prices, multiple people taking buses, etc. Similarly, you can create a static attribute table rate_codes to record this information, and also run the default HEAP storage engine:
CREATE TABLE IF NOT EXISTS rate_codes (
rate_code int,
description text
)
USING heap;
INSERT INTO rate_codes VALUES
(1, ‘Standard Rate’),
(2, ‘Airport 1’),
(3, ‘Airport 2’),
(4, ‘Special Area’),
(5, ‘Negotiated Price’),
(6, ‘Group’);
Next you can create a time series data table to save specific itinerary data. There are some fields that briefly explain the meanings. pickup_datetime/dropoff_datetime represents the time point of boarding and time point of boarding respectively. pickup_longitude /pickup_latitude represents the latitude and longitude of boarding locations, dropoff_longitude/dropoff_latitude represents the latitude and longitude of boarding locations, passenger_count represents the number of passengers, trip_distance is the distance of the journey (in miles), total_amount represents the ride cost, and the last field trip_duration is a field generated when data is loaded, recording the ride duration (in minutes).
In the timing scenario of taking a taxi, the metadata in the trip table is the timing data generated by the change of the device over time. Because data sources are diverse and varied in timing scenarios, high requirements are put forward for writing and storing timing data, while the demand for updating and deleting data is low. Therefore, you can use the MARS series storage engine in MatrixDB. MARS2 is the best choice, and its timing data writing, storage and query performance is significantly optimized.
MARS2 tables rely on matrixts extensions, so before creating a table, you need to first create an extension in the library using the storage engine. If it has been created, there is no need to create it repeatedly. Skip this step:
CREATE EXTENSION matrixts;
When creating tables, you should use the USING mars2 statement to specify the storage engine and use the WITH statement to set the corresponding parameters. where compression represents the compression algorithm, supports zstd, zlib, and lz4, and the default value is lz4; compresslevel represents the compression level. The smaller the value, the faster the compression speed; the larger the value, the higher the compression ratio; the modern value, the compression speed and compression ratio are relatively balanced. Different algorithms have different valid values ranges: zstd: 1-19 zlib: 1-9 lz4:1-20. The default value is 1.
CREATE TABLE IF NOT EXISTS trip (
vendor_id text,
pickup_datetime timestamp without time zone,
dropoff_datetime timestamp without time zone,
passenger_count int,
trip_distance numeric,
pickup_longitude numeric,
pickup_latitude numeric,
rate_code_id int,
store_and_fwd_flag text,
dropoff_longitude numeric,
dropoff_latitude numeric,
payment_type int,
fare_amount numeric,
extra numeric,
mta_tax numeric,
tip_amount numeric,
tolls_amount numeric,
improvement_surcharge numeric,
total_amount numeric,
trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
)
USING mars2
WITH (compresstype='lz4', compresslevel=1)
DISTRIBUTED BY (vendor_id)
PARTITION BY RANGE (pickup_datetime)
( START (date '2016-01-01') INCLUSIVE
END (date '2016-02-01') EXCLUSIVE
EVERY (INTERVAL '1 day') );
The existence of the DISTRIBUTED BY statement means that you will bucket the data in the trip table and implement hash distribution according to the vendor_id column, so that data with the same value is on the same node. The PARTITION BY statement is a data partition. From January 1, 2016 (including today's day) to February 1, 2016 (excluding today's day), 31 partition tables are created at one-day intervals. The trip table is automatically partitioned by day, which facilitates quick cropping by time period queries, and is also conducive to the rapid processing of expired data in the future.
Notes!
SQL syntax speculates that DISTRIBUTED BY must be specified first, and then PARTITION BY must be specified. The actual execution is to first perform DISTRIBUTED BY to distribute data to the corresponding node, and then perform PARTITION BY on the specified node to insert data into the corresponding subpartition table.
After the MARS2 table is created, an additional index of type mars2_btree must be created so that normal data reading and writing can be performed. Indexing has several main functions:
CREATE INDEX idx_trip ON trip USING mars2_btree (vendor_id, pickup_datetime);
You can find the yellow_tripdata_2016-01.csv file path downloaded from the above link and then use the mxgate command to load the data. After tail, specify the actual file path, and use the --db-master-host parameter to specify the actual node name or IP, for example:
tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256 --delimiter ',' --exclude-columns trip_duration
The main parameters of mxgate are as follows:
--db-database postgres Specify the database name
--db-master-host mdw master node host name or IP
--db-master-port 5432 database port
--db-user mxadmin database username
--time-format raw original format, no conversion
--target trip table name to import
--parallel 256 parallel number
--delimiter ',' delimiter
For more information about mxgate, please see MatrixGate
MatrixDB provides time_bucket function, which supports segmented calculations according to any time interval. Before use, you also need to install the MatrixTS extension on the database to initialize the timing components without repeated creation.
CREATE EXTENSION matrixts;
Then you can use the following SQL statement to count how many itineeraries are there every day:
SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*)
from trip
GROUP BY day
ORDER BY day;
If you want to know how many people are traveling in each hour of the day on January 2, 2016, you can use the following SQL:
SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;
It can be quickly learned from the max and min functions that the farthest distance in the current dataset is 485.9 miles. If you want to further understand the total number of strokes in different stroke distances above 10, 10-50, 50-100, 100-200, or 200, you only need one SQL statement to complete:
SELECT distance_range, count(*) AS num_of_trips
FROM
(
SELECT
CASE
WHEN trip_distance <= 10 THEN 10
WHEN trip_distance > 10 AND trip_distance <= 50 THEN 50
WHEN trip_distance > 50 AND trip_distance <= 100 THEN 100
WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
WHEN trip_distance > 200 THEN 500
END AS distance_range
FROM trip
) AS temp
GROUP BY distance_range;
After executing, you will be done:
distance_range | num_of_trips
----------------------------------------------------------------------------------------------------------------------------------
10 | 10308767
50 | 586200
100 | 379
200 | 58
500 | 9