pgvector

В этом документе описывается расширение pgvector, предназначенное для хранения и запросов векторных данных.

1 Описание

  • Возможности
    • Хранение векторных данных;
    • Выполнение точного или приближённого поиска ближайших соседей;
    • Поддержка вычислений расстояния L2, скалярного произведения и косинусного расстояния;
    • Поддержка индексов IVFFlat и HNSW;
    • Доступность из любого языка программирования через стандартные клиенты Postgres.

2 Быстрый старт

Установите расширение. Это нужно сделать один раз для каждой базы данных.

=# CREATE EXTENSION vector;

Создайте таблицу с векторным столбцом размерности 3.

=# CREATE TABLE items (id bigserial PRIMARY KEY, embedding vector(3));

Вставьте векторные данные.

=# INSERT INTO items (embedding) VALUES ('[1,2,3]'), ('[4,5,6]');

Найдите ближайших соседей с использованием расстояния L2.

=# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;

Также поддерживаются скалярное произведение (<#>) и косинусное расстояние (<=>).

Примечание!
<#> возвращает отрицательное значение скалярного произведения, поскольку YMatrix поддерживает упорядоченные сканирования индекса только для этого оператора.

3 Полные возможности

3.1 Хранение

В этом разделе демонстрируется использование DDL и DML.

Создайте новую таблицу с векторным столбцом.

=# CREATE TABLE items (id bigserial PRIMARY KEY, embedding vector(3));

Или добавьте векторный столбец в существующую таблицу.

=# ALTER TABLE items ADD COLUMN embedding vector(3);

Вставьте векторные данные.

=# INSERT INTO items (embedding) VALUES ('[1,2,3]'), ('[4,5,6]');

Выполните операцию upsert (вставка или обновление) векторных данных.

=# INSERT INTO items (id, embedding) VALUES (1, '[1,2,3]'), (2, '[4,5,6]')
   ON CONFLICT (id) DO UPDATE SET embedding = EXCLUDED.embedding;

Обновите векторные данные.

=# UPDATE items SET embedding = '[1,2,3]' WHERE id = 1;

Удалите векторные данные.

=# DELETE FROM items WHERE id = 1;

3.2 Запросы

В этом разделе показано, как писать SQL-запросы.

Найдите ближайших соседей заданного вектора.

=# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;

Найдите ближайших соседей конкретной строки.

=# SELECT * FROM items WHERE id != 1 ORDER BY embedding <-> (SELECT embedding FROM items WHERE id = 1) LIMIT 5;

Найдите строки, находящиеся на определённом расстоянии.

=# SELECT * FROM items WHERE embedding <-> '[3,1,2]' < 5;

Примечание!
Запросы должны включать оба условия ORDER BY и LIMIT, чтобы использовать индексы.

3.2.1 Метрики расстояния

Вычислите расстояния.

=# SELECT embedding <-> '[3,1,2]' AS distance FROM items;

Для скалярного произведения умножьте результат на -1 (так как <#> возвращает отрицательное скалярное произведение).

=# SELECT (embedding <#> '[3,1,2]') * -1 AS inner_product FROM items;

Для косинусной близости вычтите косинусное расстояние из 1.

=# SELECT 1 - (embedding <=> '[3,1,2]') AS cosine_similarity FROM items;

3.2.2 Агрегация

Вычислите средний вектор.

=# SELECT AVG(embedding) FROM items;

Вычислите групповые средние векторы.

=# SELECT id, AVG(embedding) FROM items GROUP BY id;

3.3 Индексация

По умолчанию pgvector выполняет точный поиск ближайших соседей с полным охватом (perfect recall).

Вы можете создать индекс для включения приближённого поиска ближайших соседей (ANN), пожертвовав частью охвата ради скорости. В отличие от традиционных индексов, ANN-индексы могут возвращать разные результаты при повторных запросах.

Поддерживаемые типы индексов:

  • IVFFlat
  • HNSW

3.3.1 IVFFlat

IVFFlat — это кластеризованный индекс, который разделяет многомерное пространство на кластеры и выполняет поиск только в тех кластерах, которые наиболее близки к вектору запроса. Он имеет более быстрое время построения и меньшее потребление памяти по сравнению с HNSW, но относительно более низкую производительность запросов с точки зрения соотношения скорость–охват.

Три ключевых фактора для хорошего охвата:

  • Создавайте индекс после того, как таблица уже содержит некоторые данные.
  • Выберите подходящее количество списков — хорошая отправная точка: rows / 1000 для до 1 млн строк или sqrt(rows) при превышении 1 млн строк.
  • Установите подходящее количество probes во время запроса — хорошая отправная точка: sqrt(lists).

Создавайте по одному индексу IVFFlat для каждой используемой функции расстояния.

Расстояние L2:

=# CREATE INDEX ON items USING ivfflat (embedding vector_l2_ops) WITH (lists = 100);

Скалярное произведение:

=# CREATE INDEX ON items USING ivfflat (embedding vector_ip_ops) WITH (lists = 100);

Косинусное расстояние:

=# CREATE INDEX ON items USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

Можно индексировать векторы размерностью до 2000.

3.3.1.1 Параметры запроса

Установите количество probes (по умолчанию 1).

=# SET ivfflat.probes = 10;

Более высокие значения улучшают охват за счёт скорости. Вы можете установить его равным количеству списков для точного поиска ближайших соседей (в этом случае оптимизатор не будет использовать индекс).

Используйте SET LOCAL внутри транзакции, чтобы применить настройку к одному запросу.

=# BEGIN;
SET LOCAL ivfflat.probes = 10;
SELECT ...
COMMIT;

3.3.2 HNSW

HNSW — это графовый индекс, строящий многоуровневый граф. Нижний уровень содержит все векторы, а верхние уровни — меньшее количество узлов. По сравнению с IVFFlat, HNSW имеет более медленное время построения и более высокое потребление памяти, но лучшую производительность запросов с точки зрения соотношения скорость–охват. Не требует обучения, поэтому индекс можно создавать даже при пустой таблице.

Создавайте по одному индексу HNSW для каждой функции расстояния.

Расстояние L2:

=# CREATE INDEX ON items USING hnsw (embedding vector_l2_ops);

Скалярное произведение:

=# CREATE INDEX ON items USING hnsw (embedding vector_ip_ops);

Косинусное расстояние:

=# CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops);

Можно индексировать векторы размерностью до 2000.

3.3.2.1 Параметры индекса

Укажите параметры HNSW:

  • m — максимальное количество соединений на уровне (по умолчанию: 16)
  • ef_construction — размер динамического списка кандидатов при построении графа (по умолчанию: 64)
=# CREATE INDEX ON items USING hnsw (embedding vector_l2_ops) WITH (m = 16, ef_construction = 64);
3.3.2.2 Параметры запроса

Установите размер динамического списка кандидатов во время поиска (по умолчанию: 40).

=# SET hnsw.ef_search = 100;

Более высокие значения улучшают охват за счёт скорости.

Используйте SET LOCAL внутри транзакции для настройки на один запрос.

=# BEGIN;
SET LOCAL hnsw.ef_search = 100;
SELECT ...
COMMIT;

3.3.3 Прогресс построения индекса

Отслеживайте прогресс создания индекса.

=# SELECT phase, tuples_done, tuples_total FROM pg_stat_progress_create_index;

Фазы:

  1. initializing
  2. loading tuples — только для IVFFlat
  3. clustering — только для IVFFlat
  4. indexing

Примечание!
Столбцы tuples_done и tuples_total заполняются только во время фазы loading tuples.

3.4 Фильтрация

Существует несколько способов фильтрации запросов ближайших соседей с использованием условий WHERE.

=# SELECT * FROM items WHERE id = 123 ORDER BY embedding <-> '[3,1,2]' LIMIT 5;

Создайте индекс по одному или нескольким столбцам, используемым для точной фильтрации.

=# CREATE INDEX ON items (id);

Или создайте частичный индекс по векторному столбцу для приближённого поиска с фильтрацией.

=# CREATE INDEX ON items USING ivfflat (embedding vector_l2_ops) WITH (lists = 100)
    WHERE (id = 123);

Используйте секционирование для эффективной фильтрации по множеству различных значений столбца.

=# CREATE TABLE items (embedding vector(3), id int) PARTITION BY LIST(id);

4 Примеры

4.1 Python

import psycopg2


def connect_to_database():
    # Connect to the database
    try:
        conn = psycopg2.connect(
            database="your_database_name",
            user="your_database_user",
            password="your_database_password",
            host="your_database_host",
            port="your_database_port"
        )
        return conn
    except psycopg2.Error as e:
        print(f"Error connecting to the database: {e}")
        return None


def close_database_connection(conn):
    # Close the database connection
    if conn:
        conn.close()


def create_extension(conn, extension_name):
    try:
        cursor = conn.cursor()
        # Execute insert operation
        cursor.execute('create extension if not exists %s ;' % extension_name)
        conn.commit()
        cursor.close()
        print("extension %s has been created successfully." % extension_name)
    except psycopg2.Error as e:
        print(f"Error create extension: {e}")


def create_table(conn, tablename, sql_tabledef):
    try:
        cursor = conn.cursor()
        # Create vector table
        cursor.execute(sql_tabledef)
        conn.commit()
        cursor.close
        print('table %s has been created successfully.' % tablename)
    except psycopg2.Error as e:
        print(f"Error inserting data: {e}")


def insert_data(conn, tablename, data):
    try:
        cursor = conn.cursor()
        # Execute insert operation
        cursor.execute("INSERT INTO %s VALUES ('%s',%s, %s,'%s')" % (
            tablename, data['value1'], data['value2'], data['value3'], data['value4']))
        conn.commit()
        cursor.close()
        print("Data inserted successfully.")
    except psycopg2.Error as e:
        print(f"Error inserting data: {e}")


def select_data(conn, tablename):
    try:
        cursor = conn.cursor()
        # Execute query operation
        cursor.execute("SELECT * FROM %s" % tablename)
        result = cursor.fetchall()
        cursor.close()
        return result
    except psycopg2.Error as e:
        print(f"Error selecting data: {e}")
        return []


def vector_search(conn, tablename, vector_type, nearest_values):
    try:
        cursor = conn.cursor()
        # Execute query operation
        cursor.execute(
            "SELECT * FROM %s order by embedding %s '%s' limit 5;" % (tablename, vector_type, nearest_values))
        result = cursor.fetchall()
        cursor.close()
        return result
    except psycopg2.Error as e:
        print(f"Error selecting data: {e}")
        return []


def update_data(conn, tablename, data):
    try:
        cursor = conn.cursor()
        # Execute update operation
        cursor.execute(
            "UPDATE %s SET embedding = '%s' WHERE id = %s" % (tablename, data['new_value'], data['condition']))
        conn.commit()
        cursor.close()
        print("Data updated successfully.")
    except psycopg2.Error as e:
        print(f"Error updating data: {e}")


def delete_data(conn, tablename, condition):
    try:
        cursor = conn.cursor()
        # Execute delete operation
        cursor.execute("DELETE FROM %s WHERE id = %s" % (tablename, condition))
        conn.commit()
        cursor.close()
        print("Data deleted successfully.")
    except psycopg2.Error as e:
        print(f"Error deleting data: {e}")


if __name__ == "__main__":
    conn = connect_to_database()
    if conn:
        # Perform database operations here
        data_to_insert = {
            'value1': '2023-10-12 00:00:00',
            'value2': 1,
            'value3': 1,
            'value4': [1, 2, 3]

        }

        # Create vector extension
        create_extension(conn, 'vector')

        # Create vector table
        sql_tabledef = 'drop table if exists documents_l2;' \
                       'CREATE TABLE documents_l2(' \
                       'created_at timestamptz,' \
                       'id integer,' \
                       'document_type int,' \
                       'embedding vector(3)' \
                       ')' \
                       'distributed by (id);';
        create_table(conn, 'documents_l2', sql_tabledef)

        # Insert data into table
        insert_data(conn, 'documents_l2', data_to_insert)

        # Query data
        data_to_select = select_data(conn, 'documents_l2')
        print("Selected Data:", data_to_select)

        # Update data
        data_to_update = {
            'new_value': '[4,5,6]',
            'condition': 1
        }
        update_data(conn, 'documents_l2', data_to_update)
        # Query data
        data_to_select = select_data(conn, 'documents_l2')
        print("Selected Data:", data_to_select)

        # Vector search
        print('Vector search: L2 distance')
        data_vector_search = vector_search(conn, 'documents_l2', '<->', '[4,5,6]')
        print("vector search (L2):", data_vector_search)

        print('Vector search: inner product')
        data_vector_search = vector_search(conn, 'documents_l2', '<#>', '[4,5,6]')
        print("vector search (IP):", data_vector_search)

        print('Vector search: cosine distance')
        data_vector_search = vector_search(conn, 'documents_l2', '<=>', '[4,5,6]')
        print("vector search (cos):", data_vector_search)

        # Delete data
        data_to_delete = 1
        delete_data(conn, 'documents_l2', data_to_delete)
        # Query data
        data_to_select = select_data(conn, 'documents_l2')
        print("Selected Data:", data_to_select)

        close_database_connection(conn)

4.2 Golang

package main

import (
    "database/sql"
    "fmt"

    _ "github.com/lib/pq"
)

type Data struct {
    Id int32
    Embedding string
}

func getConn() (*sql.DB, error) {
    psqlInfo := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", "192.168.100.30", 6432, "mxadmin", "", "mxadmin")

    db, err := sql.Open("postgres", psqlInfo)
    if err != nil {
        return nil, err
    }

    return db, nil
}

func insertData() error {
    db, _ := getConn()
    defer db.Close()

    _, err := db.Exec(`INSERT INTO items (embedding) VALUES ('[1,2,3]'), ('[4,5,6]')`)
    return err
}

func selectData() ([]Data, error) {
    db, _ := getConn()
    defer db.Close()

    datas := make([]Data, 0)
    rows, err := db.Query(`SELECT id, embedding FROM items`)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    for rows.Next() {
        data := Data{}
        err = rows.Scan(&data.Id, &data.Embedding)
        if err != nil {
            return nil, err
        }
        datas = append(datas, data)
    }

    return datas, err
}

func main() {
    insertData()
    datas, _ := selectData()
    fmt.Println(datas)
}