Быстрый старт
Развертывание
Моделирование данных
Подключение
Запись данных
Миграция
Запросы
Операции и обслуживание
Типовое обслуживание
Секционирование
Резервное копирование и восстановление
Масштабирование
Зеркалирование
Управление ресурсами
Безопасность
Мониторинг
Настройка производительности
Устранение неполадок
Справочник
Руководство по инструментам
Типы данных
Хранилище данных
Выполняющая система
Потоковая передача
Восстановление после сбоев
Конфигурация
Индексы
Расширения
Справочник по SQL
Часто задаваемые вопросы
Этот документ содержит часто задаваемые вопросы, связанные с PLPython.
Прямое использование Python с библиотекой подключения для работы с базой данных предполагает автономное поведение; PLPython позволяет перенести вычисления непосредственно в базу данных, используя распределенные вычислительные ресурсы.
Для использования PLPython в вашей базе данных просто создайте следующее расширение в YMatrix:
=# CREATE EXTENSION plpython3u;
Просто установите их с помощью pip3 на всех серверах YMatrix.
Исходный код:
# -*- coding: utf-8 -*-
import psycopg2
class MatrixDB(object):
def __init__(self):
self.host = "69.230.237.118"
self.user = "shidb"
self.database = "test"
self.port = "5432"
self.password = "sfaLxxXwkh"
def get_conn(self):
conn = psycopg2.connect(database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port)
return conn
def create_table(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "CREATE TABLE data(" \
"time timestamp," \
"tag_id int," \
"metrics1 float8," \
"metrics2 float8," \
"metrics3 float8" \
")Distributed by(tag_id)"
cursor.execute(sql)
conn.commit()
conn.close()
def insert(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "INSERT INTO data VALUES(now(), 1, 1.1, 1.2, 1.3)"
cursor.execute(sql)
conn.commit()
conn.close()
def select(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "SELECT * FROM data"
cursor.execute(sql)
data = cursor.fetchone()
conn.commit()
conn.close()
return data
if __name__ == '__main__':
mxdb = MatrixDB()
mxdb.create_table()
mxdb.insert()
print (mxdb.select())
Исходный код:
# -*- coding: utf-8 -*-
import psycopg2
import random
import time
from io import StringIO
class MatrixDB(object):
def __init__(self):
self.host = "69.230.237.118"
self.user = "shidb"
self.database = "test"
self.port = "5432"
self.password = "sfaLxxXwkh"
def get_conn(self):
conn = psycopg2.connect(database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port)
return conn
def create_table(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "CREATE TABLE data(" \
"time timestamp," \
"tag_id int," \
"metrics1 float8," \
"metrics2 float8," \
"metrics3 float8" \
")Distributed by(tag_id)"
cursor.execute(sql)
conn.commit()
conn.close()
def insert(self,num):
start_time = time.time()
print("start_time:", start_time)
conn = self.get_conn()
cursor = conn.cursor()
insert_head = "insert into data values "
insert_data = "(now(),'{}','{}','{}','{}')"
a = []
for i in range(num):
tag_id = round(random.random() * 100)
metrics1 = round(random.random(), 2)
metrics2 = round(random.random(), 2)
metrics3 = round(random.random(), 2)
print(i,"random:", tag_id, metrics1, metrics2, metrics3)
insert_data = ('now()',tag_id, metrics1, metrics2, metrics3)
a.append(insert_data)
data_str=str(a).strip('[').strip(']') + ';'
print("insert data str:","insert into data values "+data_str)
print("batch insert data")
sql="insert into data values "+str(a).strip('[').strip(']')+';'
cursor.execute(sql)
conn.commit()
conn.close()
end_time = time.time()
print("end_time:", end_time,end_time-start_time)
def copybatch(self,num):
start_time = time.time()
print("start_time:", start_time)
conn = self.get_conn()
cursor = conn.cursor()
f = StringIO()
insert_data=""
for i in range(num):
tag_id = round(random.random() * 100)
metrics1 = round(random.random(), 2)
metrics2 = round(random.random(), 2)
metrics3 = round(random.random(), 2)
print(i, "random:", tag_id, metrics1, metrics2, metrics3)
insert_data += str(tag_id)+"\t"+str(metrics1)+"\t"+str(metrics2)+"\t"+str(metrics3)+"\n"
print("insert_data",insert_data)
f.write(insert_data)
f.seek(0)
cursor.copy_from(f, "data",
columns=("tag_id", "metrics1", "metrics2","metrics3"),
sep='\t', null='\\N', size=16384) # 默认sep和null 都是none
conn.commit()
end_time = time.time()
print("end_time:", end_time, end_time - start_time)
def select(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "SELECT count(*) FROM data"
cursor.execute(sql)
data = cursor.fetchone()
conn.commit()
conn.close()
return data
if __name__ == '__main__':
mxdb = MatrixDB()
#mxdb.create_table()
#mxdb.insert(3)
#print (mxdb.select())
mxdb.copybatch(3)
print(mxdb.select())
Код Python в PLPython используется так же, как и вызов других Python-программ в рабочей среде Python. Просто добавьте соответствующий путь в sys.path.
Пример
Запишите следующий код в /home/mxadmin/python/demo.py:
#Custom functions
def print_and_return(anything):
print("print_and_return: {}".format(anything))
return anything
#Test the effect
print_and_return("hello_world")
print_and_return(["hello", "world"])
В PLPython импортируйте эту функцию:
create or replace function call_my_function(my_name text, my_time timestamptz, my_bat_voltage numeric)
returns text
as $$
import sys
# Put the Python code we have written into the environment variables currently executing Python
sys.path.append('/home/mxadmin/python/')
from demo import print_and_return
# Call the function we just created
result = print_and_return("my name is: {} time: {} bat: {}".format(my_name, my_time, my_bat_voltage))
# The result is returned to SQL
return str(my_time)
$$ language plpython3u;
Результат будет следующим:
mydb=# SELECT call_my_function('a', now(), 1);
call_my_function
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2022-11-17 17:14:29.618774+08
(1 row)
Исходный код:
create or replace function call_my_exception()
returns text
as $$
import io
import sys
from contextlib import redirect_stdout
import traceback
def print_exception():
print("exception will happen")
empty_list = []
empty_list[1] += 1
my_print = io.StringIO()
with redirect_stdout(my_print):
print_exception()
return "to sql: {}".format(my_print.getvalue())
$$ language plpython3u;
Результат следующий
mydb=# SELECT call_my_exception();
ERROR: IndexError: list index out of range
CONTEXT: Traceback (most recent call last):
PL/Python function "call_my_exception", line 14, in <module>
print_exception()
PL/Python function "call_my_exception", line 10, in print_exception
empty_list[1] += 1
PL/Python function "call_my_exception"
Исходный код:
CREATE OR REPLACE FUNCTION public.call_my_tb_exception()
RETURNS text
LANGUAGE plpython3u
AS $function$
import io
import sys
from contextlib import redirect_stdout
import traceback
def print_traceback_exception():
print("exception will happen")
empty_list = []
try:
empty_list[1] += 1
except Exception:
print(traceback.format_exc())
my_print = io.StringIO()
with redirect_stdout(my_print):
print_traceback_exception()
# pplpy.info(my_print.getvalue())
return "to sql: {}".format(my_print.getvalue())
$function$;
Результат следующий
mydb=# SELECT call_my_tb_exception();
call_my_tb_exception
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
to sql: exception will happen +
Traceback (most recent call last): +
File "<string>", line 12, in print_traceback_exception+
IndexError: list index out of range +
+
(1 row)