This document describes PLPython-related frequently asked questions.
Directly use Python + connection library to operate the database, and the behavior is stand-alone; PLPython can sink calculations to the database, leveraging distributed computing power.
Simply create the following extension in MatrixDB to use PLPython in your database.
=# CREATE EXTENSION plpython3u;
Just use pip3 to install it on all MatrixDB servers.
source code:
# -*- coding: utf-8 -*-
import psycopg2
class MatrixDB(object):
def __init__(self):
self.host = "69.230.237.118"
self.user = "shidb"
self.database = "test"
self.port = "5432"
self.password = "sfaLxxXwkh"
def get_conn(self):
conn = psycopg2.connect(database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port)
return conn
def create_table(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "CREATE TABLE data(" \
"time timestamp," \
"tag_id int," \
"metrics1 float8," \
"metrics2 float8," \
"metrics3 float8" \
")Distributed by(tag_id)"
cursor.execute(sql)
conn.commit()
conn.close()
def insert(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "INSERT INTO data VALUES(now(), 1, 1.1, 1.2, 1.3)"
cursor.execute(sql)
conn.commit()
conn.close()
def select(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "SELECT * FROM data"
cursor.execute(sql)
data = cursor.fetchone()
conn.commit()
conn.close()
return data
if __name__ == '__main__':
mxdb = MatrixDB()
mxdb.create_table()
mxdb.insert()
print (mxdb.select())
source code:
# -*- coding: utf-8 -*-
import psycopg2
import random
import time
from io import StringIO
class MatrixDB(object):
def __init__(self):
self.host = "69.230.237.118"
self.user = "shidb"
self.database = "test"
self.port = "5432"
self.password = "sfaLxxXwkh"
def get_conn(self):
conn = psycopg2.connect(database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port)
return conn
def create_table(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "CREATE TABLE data(" \
"time timestamp," \
"tag_id int," \
"metrics1 float8," \
"metrics2 float8," \
"metrics3 float8" \
")Distributed by(tag_id)"
cursor.execute(sql)
conn.commit()
conn.close()
def insert(self,num):
start_time = time.time()
print("start_time:", start_time)
conn = self.get_conn()
cursor = conn.cursor()
insert_head = "insert into data values "
insert_data = "(now(),'{}','{}','{}','{}')"
a = []
for i in range(num):
tag_id = round(random.random() * 100)
metrics1 = round(random.random(), 2)
metrics2 = round(random.random(), 2)
metrics3 = round(random.random(), 2)
print(i,"random:", tag_id, metrics1, metrics2, metrics3)
insert_data = ('now()',tag_id, metrics1, metrics2, metrics3)
a.append(insert_data)
data_str=str(a).strip('[').strip(']') + ';'
print("insert data str:","insert into data values "+data_str)
print("batch insert data")
sql="insert into data values "+str(a).strip('[').strip(']')+';'
cursor.execute(sql)
conn.commit()
conn.close()
end_time = time.time()
print("end_time:", end_time,end_time-start_time)
def copybatch(self,num):
start_time = time.time()
print("start_time:", start_time)
conn = self.get_conn()
cursor = conn.cursor()
f = StringIO()
insert_data=""
for i in range(num):
tag_id = round(random.random() * 100)
metrics1 = round(random.random(), 2)
metrics2 = round(random.random(), 2)
metrics3 = round(random.random(), 2)
print(i, "random:", tag_id, metrics1, metrics2, metrics3)
insert_data += str(tag_id)+"\t"+str(metrics1)+"\t"+str(metrics2)+"\t"+str(metrics3)+"\n"
print("insert_data",insert_data)
f.write(insert_data)
f.seek(0)
cursor.copy_from(f, "data",
columns=("tag_id", "metrics1", "metrics2","metrics3"),
sep='\t', null='\\N', size=16384) # 默认sep和null 都是none
conn.commit()
end_time = time.time()
print("end_time:", end_time, end_time - start_time)
def select(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "SELECT count(*) FROM data"
cursor.execute(sql)
data = cursor.fetchone()
conn.commit()
conn.close()
return data
if __name__ == '__main__':
mxdb = MatrixDB()
#mxdb.create_table()
#mxdb.insert(3)
#print (mxdb.select())
mxdb.copybatch(3)
print(mxdb.select())
Python code is used in PLPython, the same way as calling other Python programs under workspace in Python. Just add the corresponding path to sys.append.
Example
Write the following code in /home/mxadmin/python/demo.py
:
#Custom functions
def print_and_return(anything):
print("print_and_return: {}".format(anything))
return anything
#Test the effect
print_and_return("hello_world")
print_and_return(["hello", "world"])
In PLPython import this function:
create or replace function call_my_function(my_name text, my_time timestamptz, my_bat_voltage numeric)
returns text
as $$
import sys
# Put the Python code we have written into the environment variables currently executing Python
sys.path.append('/home/mxadmin/python/')
from demo import print_and_return
# Call the function we just created
result = print_and_return("my name is: {} time: {} bat: {}".format(my_name, my_time, my_bat_voltage))
# The result is returned to SQL
return str(my_time)
$$ language plpython3u;
The output is as follows:
mydb=# SELECT call_my_function('a', now(), 1);
call_my_function
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2022-11-17 17:14:29.618774+08
(1 row)
source code:
create or replace function call_my_exception()
returns text
as $$
import io
import sys
from contextlib import redirect_stdout
import traceback
def print_exception():
print("exception will happen")
empty_list = []
empty_list[1] += 1
my_print = io.StringIO()
with redirect_stdout(my_print):
print_exception()
return "to sql: {}".format(my_print.getvalue())
$$ language plpython3u;
Output as follows
mydb=# SELECT call_my_exception();
ERROR: IndexError: list index out of range
CONTEXT: Traceback (most recent call last):
PL/Python function "call_my_exception", line 14, in <module>
print_exception()
PL/Python function "call_my_exception", line 10, in print_exception
empty_list[1] += 1
PL/Python function "call_my_exception"
source code:
CREATE OR REPLACE FUNCTION public.call_my_tb_exception()
RETURNS text
LANGUAGE plpython3u
AS $function$
import io
import sys
from contextlib import redirect_stdout
import traceback
def print_traceback_exception():
print("exception will happen")
empty_list = []
try:
empty_list[1] += 1
except Exception:
print(traceback.format_exc())
my_print = io.StringIO()
with redirect_stdout(my_print):
print_traceback_exception()
# pplpy.info(my_print.getvalue())
return "to sql: {}".format(my_print.getvalue())
$function$;
Output as follows
mydb=# SELECT call_my_tb_exception();
call_my_tb_exception
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
to sql: exception will happen +
Traceback (most recent call last): +
File "<string>", line 12, in print_traceback_exception+
IndexError: list index out of range +
+
(1 row)