PLPython FAQ

This document describes PLPython-related frequently asked questions.


1 What are the differences and advantages and disorders between PLPython and Python connecting database operations?


Directly using Python + connection library to operate the database, the behavior is stand-alone; PLPython can sink calculations to the database, leveraging distributed computing power.


2 How to use PLPython in a database


Simply create the following extension in YMatrix to use PLPython in your database.

=# CREATE EXTENSION plpython3u;


3 How to install third-party libraries in PLPython?


Just use pip3 to install it on all YMatrix servers.


4 Python access to YMatrix database examples


source code:

# -*- coding: utf-8 -*-
import psycopg2

class MatrixDB(object):
    def __init__(self):
        self.host = "69.230.237.118"
        self.user = "shidb"
        self.database = "test"
        self.port = "5432"
        self.password = "sfaLxxXwkh"

    def get_conn(self):
        conn = psycopg2.connect(database=self.database,
                                user=self.user,
                                password=self.password,
                                host=self.host,
                                port=self.port)
        return conn

    def create_table(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "CREATE TABLE data(" \
                "time timestamp," \
                "tag_id int," \
                "metrics1 float8," \
                "metrics2 float8," \
                "metrics3 float8" \
                ")Distributed by(tag_id)"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def insert(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "INSERT INTO data VALUES(now(), 1, 1.1, 1.2, 1.3)"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def select(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "SELECT * FROM data"
        cursor.execute(sql)
        data = cursor.fetchone()
        conn.commit()
        conn.close()
        return data

if __name__ == '__main__':
    mxdb = MatrixDB()
    mxdb.create_table()
    mxdb.insert()
    print (mxdb.select())


5 Python batch insertion data example


source code:

# -*- coding: utf-8 -*-
import psycopg2
import random
import time
from io import StringIO

class MatrixDB(object):
    def __init__(self):
        self.host = "69.230.237.118"
        self.user = "shidb"
        self.database = "test"
        self.port = "5432"
        self.password = "sfaLxxXwkh"

    def get_conn(self):
        conn = psycopg2.connect(database=self.database,
                                user=self.user,
                                password=self.password,
                                host=self.host,
                                port=self.port)
        return conn

    def create_table(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "CREATE TABLE data(" \
                "time timestamp," \
                "tag_id int," \
                "metrics1 float8," \
                "metrics2 float8," \
                "metrics3 float8" \
                ")Distributed by(tag_id)"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def insert(self,num):
        start_time = time.time()
        print("start_time:", start_time)
        conn = self.get_conn()
        cursor = conn.cursor()
        insert_head = "insert into data values "
        insert_data = "(now(),'{}','{}','{}','{}')"

        a = []
        for i in range(num):
            tag_id = round(random.random() * 100)
            metrics1 = round(random.random(), 2)
            metrics2 = round(random.random(), 2)
            metrics3 = round(random.random(), 2)
            print(i,"random:", tag_id, metrics1, metrics2, metrics3)
            insert_data = ('now()',tag_id, metrics1, metrics2, metrics3)
            a.append(insert_data)
        data_str=str(a).strip('[').strip(']') + ';'
        print("insert data str:","insert into data values "+data_str)
        print("batch insert data")

        sql="insert into data values "+str(a).strip('[').strip(']')+';'
        cursor.execute(sql)
        conn.commit()
        conn.close()
        end_time = time.time()
        print("end_time:", end_time,end_time-start_time)

    def copybatch(self,num):
        start_time = time.time()
        print("start_time:", start_time)
        conn = self.get_conn()
        cursor = conn.cursor()
        f = StringIO()
        insert_data=""
        for i in range(num):
            tag_id = round(random.random() * 100)
            metrics1 = round(random.random(), 2)
            metrics2 = round(random.random(), 2)
            metrics3 = round(random.random(), 2)
            print(i, "random:", tag_id, metrics1, metrics2, metrics3)
            insert_data += str(tag_id)+"\t"+str(metrics1)+"\t"+str(metrics2)+"\t"+str(metrics3)+"\n"

        print("insert_data",insert_data)
        f.write(insert_data)

        f.seek(0)
        cursor.copy_from(f, "data",
                         columns=("tag_id", "metrics1", "metrics2","metrics3"),
                         sep='\t', null='\\N', size=16384)  # 默认sep和null 都是none

        conn.commit()
        end_time = time.time()
        print("end_time:", end_time, end_time - start_time)
    def select(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "SELECT count(*) FROM data"
        cursor.execute(sql)
        data = cursor.fetchone()
        conn.commit()
        conn.close()
        return data

if __name__ == '__main__':
    mxdb = MatrixDB()
    #mxdb.create_table()
    #mxdb.insert(3)
    #print (mxdb.select())
    mxdb.copybatch(3)
    print(mxdb.select())


6 Python code in PLPython


Python code is used in PLPython, the same way as calling other Python programs under workspace in Python. Just add the corresponding path to sys.append.

Example

Write the following code in /home/mxadmin/python/demo.py:

#Custom functions
def print_and_return(anything):
    print("print_and_return: {}".format(anything))
    return anything
#Test the effect
print_and_return("hello_world")
print_and_return(["hello", "world"])

In PLPython import this function:

create or replace function call_my_function(my_name text, my_time timestamptz, my_bat_voltage numeric)
returns text
as $$

import sys
# Put the Python code we have written into the environment variables currently executing Python
sys.path.append('/home/mxadmin/python/')
from demo import print_and_return
# Call the function we just created
result = print_and_return("my name is: {} time: {} bat: {}".format(my_name, my_time, my_bat_voltage))
# The result is returned to SQL
return str(my_time)
$$ language plpython3u;

The output is as follows:

mydb=# SELECT call_my_function('a', now(), 1);
       call_my_function        
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 2022-11-17 17:14:29.618774+08
(1 row)


7 PLPython standard exception handling


source code:

create or replace function call_my_exception()
returns text
as $$
import io
import sys
from contextlib import redirect_stdout
import traceback

def print_exception():
    print("exception will happen")
    empty_list = []
    empty_list[1] += 1

my_print = io.StringIO()
with redirect_stdout(my_print):
    print_exception()

return "to sql: {}".format(my_print.getvalue())
$$ language plpython3u;

Output as follows

mydb=# SELECT call_my_exception();
ERROR:  IndexError: list index out of range
CONTEXT:  Traceback (most recent call last):
  PL/Python function "call_my_exception", line 14, in <module>
    print_exception()
  PL/Python function "call_my_exception", line 10, in print_exception
    empty_list[1] += 1
PL/Python function "call_my_exception"


8 Use traceback to handle PLPython exceptions


source code:

CREATE OR REPLACE FUNCTION public.call_my_tb_exception()
 RETURNS text
 LANGUAGE plpython3u
AS $function$
import io
import sys
from contextlib import redirect_stdout
import traceback

def print_traceback_exception():
    print("exception will happen")
    empty_list = []
    try:
        empty_list[1] += 1
    except Exception:
        print(traceback.format_exc())

my_print = io.StringIO()
with redirect_stdout(my_print):
    print_traceback_exception()

# pplpy.info(my_print.getvalue())

return "to sql: {}".format(my_print.getvalue())
$function$;

Output as follows

mydb=# SELECT call_my_tb_exception();
                   call_my_tb_exception                   
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 to sql: exception will happen                           +
 Traceback (most recent call last):                      +
   File "<string>", line 12, in print_traceback_exception+
 IndexError: list index out of range                     +
                                                         +

(1 row)