Python Olap API

本文档主要介绍 OlapBase OlapOnDB 和 OlapOnDisk 在Python中的API用法

1. 概述

本手册将介绍使用TuGraph图计算系统Python接口需要的简单配置,同时结合代码对TuGraph Python API进行解释。关于ParallelBitset、OlapBase各类的作用,详见olap-base-api.md,olap-on-db-api.md和olap-on-disk-api.md

2. 配置要求

如果要使用TuGraph图计算编写以及编译自己的应用程序,需要的配置要求为:

  • linux操作系统,目前在Ubuntu16.04, Ubuntu18.04, Ubuntu20.04和Centos7, Centos8系统上可成功运行。

  • 支持C++17的编译器,要求GCC版本为8.4.0或更新的版本。

  • Cython,版本要求3.0.0以上,已测试可运行版本为3.0.0a11

3. Cython

Cython是一种高效的编程语言,是Python的超集。Cython能将py文件翻译为C/C++代码后编译为Python拓展类,在Python中通过import调用。在TuGraph中,所有的Python plugin都由Cython编译为Python拓展类后使用。

Cython的Pure Python模式在保证Python语法的同时具有C/C++的性能,TuGraph Python接口均使用Cython实现。

Cython 文档

4. Olap API

见plugins/cython/olap_base.pxd文件,用法与功能基本与C++接口相同,olap_base.pxd中声明的接口都由C++实现,在py文件中必须通过from cython.cimports.olap_base import *的方式导入,由Cython编译py文件后才能运行。

原子操作

  • cas[T](ptr: cython.pointer(T), oldv: T, newv: T)-> cython.bint:如果ptr指向的值等于oldv,则将ptr指向的值赋为newv并返回true,否则返回false

  • write_min[T](a: cython.pointer(T), b: T)-> cython.bint:如果b比a指向的值更小,那么将a指向的值赋为b并返回true,否则返回false。

  • write_max[T](a: cython.pointer(T), b: T)-> cython.bint:如果b比a指向的值更大,那么将a指向的值赋为b并返回true,否则返回false。

  • write_add[T](a: cython.pointer(T), b: T)-> cython.bint:将b的值加到a指向的值上。

  • write_sub[T](a: cython.pointer(T), b: T)-> cython.bint:将a指向的值减去b的值。

点集合类ParallelBitset

  • Size()-> size_t:表示Bitmap中的点个数。

  • ParallelBitset(size: size_t):初始化size和data,data长度为(size >> 6)+1

  • Clear()-> cython.void:清空集合

  • Fill()-> cython.void:将所有点加入集合

  • Has(size_t i)-> cython.bint:检查点i是否在集合中

  • Add(size_t i)-> cython.bint:将点i加入集合中

  • Swap(ParallelBitset &other)-> cython.void:和另一组ParallelBitset集合交换元素

点数组类ParallelVector

  • ParallelVector[T](size_t capacity) 构建ParallelVector,capacity为点数组的初始容量大小

  • operator[](i: size_t)-> T:下标为i的数据

  • begin()-> cython.pointer(T):ParallelVector的起始指针

  • end()-> cython.pointer(T):ParallelVector的结束指针。begin和end的用法类似于vector容器的begin和end指针,可以使用这两个指针对数组进行顺序访问

  • Back()-> T:ParallelVector最后一个数据

  • Data()-> cython.pointer(T):表示数组本身数据

  • Destroy()-> cython.void:清空ParallelVector数组内数据并删除数组

  • Size()-> size_t:表示ParallelVector中的数据个数

  • Resize(size: size_t)-> cython.void:更改ParallelVector为size大小,该size应大于等于更改前的大小且小于capacity

  • Clear()-> cython.void:清空ParallelVector内数据

  • ReAlloc(capacity: size_t)-> cython.void:给ParallelVector分配新的容量大小,若数组有数据则将数据迁移至新内存

  • Fill(elem: T)-> cython.void:为ParallelVector的全部数据赋值为elem

  • Append(elem: T, atomic: cython.bint = true)-> cython.void:向ParallelVector结尾添加一个数据

  • Swap(other: ParallelVector[T])-> cython.void:和其他的ParallelVector交换数据

  • Copy()-> ParallelVector[T]:复制当前的ParallelVector数据存至Copy数组中

自定义数据结构

  • Empty:内容为空的特殊数据类型。

  • EdgeUnit[EdgeData]:表示权值类型为EdgeData的边,用于解析输入文件,包含三个成员变量:

    • src: size_t:边的起始点

    • dst: size_t:边的终点

    • edge_data: EdgeData:边的权值

  • AdjUnit[EdgeData]:表示权值类型为EdgeData的边,用于批处理计算过程中,包含两个成员变量:

    • neighbour: size_t:边的邻居点

    • edge_data: EdgeData:边的权值

  • AdjList[EdgeData]:权值类型为EdgeData的点的邻接表,常用于表示点的入边和出边集合,包含两个成员变量:

    • begin()-> cython.pointer(AdjUnit[T]):列表的起始指针

    • end()-> cython.pointer(AdjUnit[T]):列表的结束指针。

    • operator[](i: size_t)-> AdjUnit[EdgeData]: 下标为i的数据

图类OlapBase

  • NumVertices()-> size_t:获取点数

  • NumEdges()-> size_t:获取边数

  • OutDegree(size_t vid)-> size_t:点vid的出度

  • InDegree(size_t vid)-> size_t:点vid的入度

  • AllocVertexArray[VertexData]() ->ParallelVector[VertexData]:分配一个类型为VertexData的数组,大小为点个数

  • AllocVertexSubset()-> ParallelBitset:分配一个ParallelBitset集合,用于表示所有点的状态是否激活

  • OutEdges(vid: size_t)-> AdjList[EdgeData]:获取点v的所有出边集合

  • InEdges(vid: size_t)-> AdjList[EdgeData]:获取点v的所有入边集合

  • Transpose()-> cython.void:对有向图进行图反转

  • LoadFromArray(edge_array: cython.p_char, input_vertices: size_t, input_edges: size_t, edge_direction_policy: EdgeDirectionPolicy):从数组中加载图数据,包含四个参数,其含义分别表示:

    • edge_array:将该数组中的数据读入图,一般情况下该数组包含多条边。

    • input_vertices:指定数组读入图的点个数。

    • input_edges:指定数组读入图的边的条数。

    • edge_direction_policy:指定图为有向或无向,包含三种模式,分别为DUAL_DIRECTION、MAKE_SYMMETRIC以及INPUT_SYMMETRIC。对应的详细介绍见include/lgraph/olap_base.h文件的enum EdgeDirectionPolicy

  • AcquireVertexLock(vid: size_t)-> cython.void:对点vid加锁,禁止其它线程对该锁对应的点数据进行访存

  • void ReleaseVertexLock(vid: size_t)-> cython.void:对点vid解锁,所有线程均可访存该锁对应的点数据

TuGraph提供了两个批处理操作来并行地进行以点为中心的批处理过程,在Python中与C++使用方法稍有不同。

# 函数名称:ProcessVertexInRange[ReducedSum, Algorithm](
#           work: (algo: Algorithm, vi: size_t)-> ReducedSum,
#           lower: size_t, upper: size_t,
#           algo: Algorithm,
#           zero: ReducedSum = 0,
#           reduce: (a: ReducedSum, b: ReducedSum)-> ReducedSum = reduce_plus[ReducedSum])
#
#     函数用途:对Graph中节点编号介于lower和upper之间的节点执行work函数。第四个参数表示累加的基数,默认为0;
#     第五个参数表示对每个work处理后的节点返回值进行迭代reduce函数操作,默认为累加操作。
#     具体实现请参考include/lgraph/olap_base.h中具体代码
#
#     使用示例:统计数组parent数组中有出边的点个数

import cython
from cython.cimports.olap_base import *


@cython.cclass
class CountCore:
    graph: cython. pointer(OlapBase[Empty])
    parent: ParallelVector[size_t]

    @cython.cfunc
    @cython.nogil
    def Work(self, vi: size_t) -> size_t:
        if self.graph.OutDegree(self.parent[vi]) > 0:
            return 1
        return 0

    def run(self, pointer_g: cython. pointer(OlapBase[Empty])):
        self.graph = pointer_g
        self.parent = self.graph.AllocVertexArray[size_t]()
        vertex_num: size_t
        vertex_num = self.graph.ProcessVertexInRange[size_t, CountCore](self.Work, 0, self.parent.Size(), self)
        print("the number is", vertex_num)

if __name__ == "__main__":
    count_core = CountCore()
    count_core.run(cython.address(g))

其中g为图类OlapBase的实例化对象

# 函数名称:ProcessVertexActive[ReducedSum, Algorithm](
#           work: (algo: Algorithm, vi: size_t)-> ReducedSum,
#           active: ParallelBitset,
#           algo: Algorithm,
#           zero: ReducedSum = 0,
#           reduce: (a: ReducedSum, b: ReducedSum)-> ReducedSum = reduce_plus[ReducedSum])
#
#   函数用途:对active_vertices中对应为1的节点执行work函数,第三个参数表示累加的基数,默认为0;
#   第四个参数表示对每个work处理后的节点返回值进行迭代reduce函数操作,默认为累加操作。
#   具体实现请参考/include/lgraph/olap_base.h中具体代码
#
# 使用示例:输出Graph中节点1,2,3的所有出度邻居,并统计这三个节点的总出度

import cython
from cython.cimports.olap_base import *
from cython.cimports.libc.stdio import printf


@cython.cclass
class NeighborCore:
    graph: cython.pointer(OlapBase[Empty])
    active_in: ParallelBitset

    @cython.cfunc
    @cython.nogil
    def Work(self, vi: size_t) -> size_t:
        degree = self.graph.OutDegree(vi)
        dst: size_t
        edges = self.graph.OutEdges(vi)
        local_out_degree: size_t
        for i in range(degree):
            dst = edges[i].neighbour
            printf("node %lu has neighbour %lu\n", vi, dst)
            local_out_degree += 1
        return local_out_degree

    def run(self, pointer_g: cython.pointer(OlapBase[Empty])):
        self.graph = pointer_g
        self.active_in = self.graph.AllocVertexSubset()
        self. active_in. Add(1)
        self. active_in. Add(2)
        self. active_in. Add(3)
        total_outdegree = cython.declare(
            size_t,
            self.graph.ProcessVertexActive[size_t, CountCore](self.Work, self.active_in, self))
        printf("total outdegree of node1,2,3 is %lu\n",total_outdegree)

if __name__ == "__main__":
    neighbor_core = NeighborCore()
    neighbor_core.run(cython.address(g))

如上面两个例子所展示,在Python中ProcessVertexActive与ProcessVertexInRange比在C++中额外需要一个算法类指针参数,Work函数一般也作为该算法类的成员函数,满足Work函数访问成员变量的需求(如图graph,点数组parent),在调用批处理函数时将Work函数和算法类的self指针传入批处理函数。

其中Work函数会在多线程中调用,因此加上修饰器@cython.nogil释放Python全局解释锁,在多线程执行的代码中(例如批处理函数中的Work函数,cython.parallel.prange中),不能包含Python对象,最好通过dst: type或者dst = cython.declare(type)的方式声明变量为C/C++类型。

图类OlapOnDB:

并行化创建有向图:

olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL)

并行化创建无向图

olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_UNDIRECTED)

ID_MAPPING创建有向图

olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_IDMAPPING)

图类OlapOnDisk

ConfigBase:

  • ConfigBase(): 构造函数

  • std::string input_dir: 图边表数据路径

  • std::string output_dir: 输出结果路径

  • Load(config: ConfigBase[EdgeData], edge_direction_policy: EdgeDirectionPolicy)-> void: 读入图数据

5. lgraph_db API

见plugins/cython/lgraph_db.pxd与lgraph_db_python.py文件。

lgraph_db.pxd中接口用法与功能基本与C++接口相同,lgraph_db.pxd中声明的接口都由C++实现,在py文件中必须通过from cython.cimports.olap_base import *的方式导入,由Cython编译py文件后才能运行。

VertexIndexIterator

  • GetVid()-> int64_t: 获取点的vid

Galaxy

  • Galaxy(dir_path: std::string): 构造函数,dir_path为db路径

  • SetCurrentUser(user: std::string, password: std::string)-> cython.void: 设置用户

  • SetUser(user: std::string)-> cython.void: 设置用户

  • OpenGraph(graph: std::string, read_only: bint)-> GraphDB: 创建GraphDB

GraphDB:

  • CreateReadTxn()-> Transaction: 创建只读事务

  • CreateWriteTxn()-> Transaction: 创建写事务

  • ForkTxn(txn: Transaction)-> Transaction: 复制事务,只能复制读事务

Transaction:

GetVertexIndexIterator(
                label: std::string,
                field: std::string,
                key_start: std::string,
                key_end: std::string)-> VertexIndexIterator

获取索引迭代器。迭代器的field值为 [key_start, key_end]。所以在key_start=key_end=v时,返回指向field值为v的点的迭代器

lgraph_db_python.py是lgraph_db.pxd中C++类 Galaxy与GraphDB的包装,将C++类包装为Python类,将lgraph_db_python.py编译为Python拓展后,可以直接在Python文件或Python命令行中import lgraph_db_python访问lgraph_db_python.PyGraphDB与PyGraphDB.PyGalaxy。

PyGalaxy:

  • PyGalaxy(self, dir_path: str): 构造函数,dir_path为db路径

  • SetCurrentUser(self, user: str password: str)-> void: 设置用户

  • SetUser(self, user: std::string)-> void: 设置用户

  • OpenGraph(self, graph: str, read_only: bool)-> PyGraphDB: 创建PyGraphDB

PyGraphDB:

  • get_pointer(self)-> cython.Py_ssize_t: C++ 类GraphDB的地址

6. 算法插件示例

下面为Python实现的BFS算法的代码示例:

# cython: language_level=3, cpp_locals=True, boundscheck=False, wraparound=False, initializedcheck=False
# distutils: language = c++

# 注释作用如下:
# language_level=3: 使用Python3
# cpp_locals=True: 需要c++17,使用std::optional管理Python代码中的C++对象,可以避免C++对象的拷贝构造
# boundscheck=False: 关闭索引的边界检查
# wraparound=False: 关闭负数下标的处理(类似Python List)
# initializedcheck=False: 关闭检查内存是否初始化,关闭检查后运行性能更快
# language = c++: 将此py文件翻译为C++而不是C文件,TuGraph使用大量模板函数,所以都应该使用C++

import json

import cython
from cython.cimports.olap_base import *
from cython.cimports.lgraph_db import *
# 从plugins/cython/ 中cimportolap_base.pxd与lgraph_db.pxd, 类似C++中#include "xxx.h"

from cython.cimports.libc.stdio import printf
# 类似C++中#include <stdio.h>
# 其他常见的还有cython.cimports.libcpp.unordered_map等

import time
import lgraph_db_python


@cython.cclass
# cython.cclass 表示BFSCore为C类型的Class
class BFSCore:
    graph: cython.pointer(OlapBase[Empty])
    # cython.pointer(OlapBase[Empty])表示OlapBase[Empty]的指针,类似C++中OlapBase[Empty]*
    # cython提供了常见类型的指针,如cython.p_int, cython.p_char等,表示int*, char*, ...
    parent: ParallelVector[size_t]
    active_in: ParallelBitset
    active_out: ParallelBitset
    root: size_t
    # root: size_t 声明root为C++ size_t类型变量,等效于root = cython.declare(size_t)
    # 不声明类型的变量为Python object类型
    # 声明变量类型会大幅提高性能,同时在多线程部分,只有C/C++类型的变量可以访问

    @cython.cfunc
    # cython.cfunc 表示Work为C类型的函数,参数与返回值应声明
    # cfunc性能好,能接受C/C++对象为参数、返回值,但是不能在其他python文件中调用
    # 类似的有cython.ccall,如Standalone函数,可以在其他python文件中调用
    @cython.nogil
    # cython.nogil 表示释放Python全局解释锁,在nogil修饰的部分,不能访问Python对象
    # 在多线程部分,都应有nogil修饰器
    @cython.exceptval(check=False)
    # cython.exceptval(check=False) 表示禁用异常传播,将忽略函数内部引发的Python异常
    def Work(self, vi: size_t) -> size_t:
        degree = cython.declare(size_t, self.graph.OutDegree(vi))
        out_edges = cython.declare(AdjList[Empty], self.graph.OutEdges(vi))
        i = cython.declare(size_t, 0)
        local_num_activations = cython.declare(size_t, 0)
        dst: size_t
        for i in range(degree):
            dst = out_edges[i].neighbour
            if self.parent[dst] == cython.cast(size_t, -1):
                # parent[dst] == -1 表示dst没有被bfs访问过
                if self.active_out.Add(dst):
                    # 将dst设置为为活跃节点;ParallelBitmap.Add为原子操作,防止重复计算
                    self.parent[dst] = vi
                    local_num_activations += 1
        return local_num_activations

    @cython.cfunc
    @cython.nogil
    @cython.exceptval(check=False)
    def run(self, g: cython.pointer(OlapBase[Empty]), r: size_t) -> cython.size_t:
        self.graph = g
        self.root = r
        self.active_in = g.AllocVertexSubset()
        self.active_out = g.AllocVertexSubset()
        self.parent = g.AllocVertexArray[size_t]()
        self.parent.Fill(-1)
        num_vertices = cython.declare(size_t, self.graph.NumVertices())
        printf("num_vertices = %lu\n", num_vertices)
        self.parent[self.root] = self.root
        num_activations = cython.declare(size_t, 1)
        discovered_vertices = cython.declare(size_t, num_activations)
        self.active_in.Add(self.root)
        while num_activations > 0:
            self.active_out.Clear()
            num_activations = g.ProcessVertexActive[size_t, BFSCore](self.Work, self.active_in, self)
            discovered_vertices += num_activations
            self.active_out.Swap(self.active_in)
            printf("num_activations = %lu\n", num_activations)
        return discovered_vertices


@cython.cfunc
def procedure_process(db: cython.pointer(GraphDB), request: dict, response: dict) -> cython.bint:
    cost = time.time()
    root_id = "0"
    label = "node"
    field = "id"
    if "root" in request:
        root_id = request["root"]
    if "label" in request:
        label = request["label"]
    if "field" in request:
        field = request["field"]

    txn = db.CreateReadTxn()
    olapondb = OlapOnDB[Empty](db[0], txn, SNAPSHOT_PARALLEL)
    # 并行创建OlapOnDB
    # Cython不支持如 *db 的解引用操作,通过db[0]来解引用
    root_vid = txn.GetVertexIndexIterator(
        label.encode('utf-8'), field.encode('utf-8'),
        root_id.encode('utf-8'), root_id.encode('utf-8')
    ).GetVid()
    # 通过 GetVertexIndexIterator 根据root节点label名和filed名与filed值(root_id)
    # 获取root节点的迭代器,通过迭代器获取vid,在无ID_MAPPING时,该vid与OlapOnDB中的id相同
    cost = time.time() - cost
    printf("prepare_cost = %lf s\n", cython.cast(cython.double, cost))
    a = BFSCore()
    cost = time.time()
    count = a.run(cython.address(olapondb), root_vid)
    cost = time.time() - cost
    printf("core_cost = %lf s\n", cython.cast(cython.double, cost))
    response["found_vertices"] = count
    response["num_vertices"] = olapondb.NumVertices()
    response["num_edges"] = olapondb.NumEdges()
    return True


@cython.ccall
def Standalone(input_dir: str, root: size_t):
    # Standalone为Standalone模式下插件入口,用cython.ccall修饰
    # 可以任意设置参数,相应修改plugins/cython/standalone_main.py即可
    cost = time.time()
    graph = OlapOnDisk[Empty]()
    config = ConfigBase[Empty]()
    config.input_dir = input_dir.encode("utf-8")
    # config为C++类,config.input_dir为std::string,Python str需要encode才能传给std::string
    graph.Load(config, DUAL_DIRECTION)
    # 读入图
    cost = time.time() - cost
    printf("load_cost = %lf s\n", cython.cast(cython.double, cost))

    cost = time.time()
    a = BFSCore()
    count = a.run(cython.address(graph), root)
    # cython.address(graph),取址,类似C++中&graph
    cost = time.time() - cost
    printf("core_cost = %lf s\n", cython.cast(cython.double, cost))
    print("find {} vertices".format(count))


@cython.ccall
def Process(db: lgraph_db_python.PyGraphDB, inp: bytes):
    # Process为embed模式和procedure模式下插件入口,用cython.ccall修饰
    # Process函数必须名为Process,参数为lgraph_db_python.PyGraphDB与bytes
    # 返回值必须为(bool, str)
    _inp = inp.decode("utf-8")
    request = json.loads(_inp)
    response = {}
    addr = cython.declare(cython.Py_ssize_t, db.get_pointer())
    # 获取PyGraphDB中GraphDB对象的地址,转换为指针后传递
    procedure_process(cython.cast(cython.pointer(GraphDB), addr),
                      request, response)
    return (True, json.dumps(response))