# Python Olap API > 本文档主要介绍 OlapBase OlapOnDB 和 OlapOnDisk 在Python中的API用法 ## 1. 概述 本手册将介绍使用TuGraph图计算系统Python接口需要的简单配置,同时结合代码对TuGraph Python API进行解释。关于ParallelBitset、OlapBase各类的作用,详见olap-base-api.md,olap-on-db-api.md和olap-on-disk-api.md ## 2. 配置要求 如果要使用TuGraph图计算编写以及编译自己的应用程序,需要的配置要求为: - linux操作系统,目前在Ubuntu16.04, Ubuntu18.04, Ubuntu20.04和Centos7, Centos8系统上可成功运行。 - 支持C++17的编译器,要求GCC版本为8.4.0或更新的版本。 - Cython,版本要求3.0.0以上,已测试可运行版本为3.0.0a11 ## 3. Cython Cython是一种高效的编程语言,是Python的超集。Cython能将py文件翻译为C/C++代码后编译为Python拓展类,在Python中通过import调用。在TuGraph中,所有的Python plugin都由Cython编译为Python拓展类后使用。 Cython的Pure Python模式在保证Python语法的同时具有C/C++的性能,TuGraph Python接口均使用Cython实现。 [Cython 文档](https://cython.readthedocs.io/en/latest/index.html) ## 4. Olap API 见procedures/algo_cython/olap_base.pxd文件,用法与功能基本与C++接口相同,olap_base.pxd中声明的接口都由C++实现,在py文件中必须通过`from cython.cimports.olap_base import *`的方式导入,由Cython编译py文件后才能运行。 ### 原子操作 - `cas[T](ptr: cython.pointer(T), oldv: T, newv: T)-> cython.bint`:如果ptr指向的值等于oldv,则将ptr指向的值赋为newv并返回true,否则返回false - `write_min[T](a: cython.pointer(T), b: T)-> cython.bint`:如果b比a指向的值更小,那么将a指向的值赋为b并返回true,否则返回false。 - `write_max[T](a: cython.pointer(T), b: T)-> cython.bint`:如果b比a指向的值更大,那么将a指向的值赋为b并返回true,否则返回false。 - `write_add[T](a: cython.pointer(T), b: T)-> cython.bint`:将b的值加到a指向的值上。 - `write_sub[T](a: cython.pointer(T), b: T)-> cython.bint`:将a指向的值减去b的值。 ### 点集合类ParallelBitset - `Size()-> size_t`:表示Bitmap中的点个数。 - `ParallelBitset(size: size_t)`:初始化size和data,data长度为(size >> 6)+1 - `Clear()-> cython.void`:清空集合 - `Fill()-> cython.void`:将所有点加入集合 - `Has(size_t i)-> cython.bint`:检查点i是否在集合中 - `Add(size_t i)-> cython.bint`:将点i加入集合中 - `Swap(ParallelBitset &other)-> cython.void`:和另一组ParallelBitset集合交换元素 ### 点数组类ParallelVector - `ParallelVector[T](size_t capacity)` 构建ParallelVector,capacity为点数组的初始容量大小 - `operator[](i: size_t)-> T`:下标为i的数据 - `begin()-> cython.pointer(T)`:ParallelVector的起始指针 - `end()-> cython.pointer(T)`:ParallelVector的结束指针。begin和end的用法类似于vector容器的begin和end指针,可以使用这两个指针对数组进行顺序访问 - `Back()-> T`:ParallelVector最后一个数据 - `Data()-> cython.pointer(T)`:表示数组本身数据 - `Destroy()-> cython.void`:清空ParallelVector数组内数据并删除数组 - `Size()-> size_t`:表示ParallelVector中的数据个数 - `Resize(size: size_t)-> cython.void`:更改ParallelVector为size大小,该size应大于等于更改前的大小且小于capacity - `Clear()-> cython.void`:清空ParallelVector内数据 - `ReAlloc(capacity: size_t)-> cython.void`:给ParallelVector分配新的容量大小,若数组有数据则将数据迁移至新内存 - `Fill(elem: T)-> cython.void`:为ParallelVector的全部数据赋值为elem - `Append(elem: T, atomic: cython.bint = true)-> cython.void`:向ParallelVector结尾添加一个数据 - `Swap(other: ParallelVector[T])-> cython.void`:和其他的ParallelVector交换数据 - `Copy()-> ParallelVector[T]`:复制当前的ParallelVector数据存至Copy数组中 ### 自定义数据结构 - `Empty`:内容为空的特殊数据类型。 - `EdgeUnit[EdgeData]`:表示权值类型为EdgeData的边,用于解析输入文件,包含三个成员变量: - `src: size_t`:边的起始点 - `dst: size_t`:边的终点 - `edge_data: EdgeData`:边的权值 - `AdjUnit[EdgeData]`:表示权值类型为EdgeData的边,用于批处理计算过程中,包含两个成员变量: - `neighbour: size_t`:边的邻居点 - `edge_data: EdgeData`:边的权值 - `AdjList[EdgeData]`:权值类型为EdgeData的点的邻接表,常用于表示点的入边和出边集合,包含两个成员变量: - `begin()-> cython.pointer(AdjUnit[T])`:列表的起始指针 - `end()-> cython.pointer(AdjUnit[T])`:列表的结束指针。 - `operator[](i: size_t)-> AdjUnit[EdgeData]`: 下标为i的数据 ### 图类OlapBase - `NumVertices()-> size_t`:获取点数 - `NumEdges()-> size_t`:获取边数 - `OutDegree(size_t vid)-> size_t`:点vid的出度 - `InDegree(size_t vid)-> size_t`:点vid的入度 - `AllocVertexArray[VertexData]() ->ParallelVector[VertexData]`:分配一个类型为VertexData的数组,大小为点个数 - `AllocVertexSubset()-> ParallelBitset`:分配一个ParallelBitset集合,用于表示所有点的状态是否激活 - `OutEdges(vid: size_t)-> AdjList[EdgeData]`:获取点v的所有出边集合 - `InEdges(vid: size_t)-> AdjList[EdgeData]`:获取点v的所有入边集合 - `Transpose()-> cython.void`:对有向图进行图反转 - `LoadFromArray(edge_array: cython.p_char, input_vertices: size_t, input_edges: size_t, edge_direction_policy: EdgeDirectionPolicy)`:从数组中加载图数据,包含四个参数,其含义分别表示: - `edge_array`:将该数组中的数据读入图,一般情况下该数组包含多条边。 - `input_vertices`:指定数组读入图的点个数。 - `input_edges`:指定数组读入图的边的条数。 - `edge_direction_policy`:指定图为有向或无向,包含三种模式,分别为DUAL_DIRECTION、MAKE_SYMMETRIC以及INPUT_SYMMETRIC。对应的详细介绍见include/lgraph/olap_base.h文件的`enum EdgeDirectionPolicy`。 - `AcquireVertexLock(vid: size_t)-> cython.void`:对点vid加锁,禁止其它线程对该锁对应的点数据进行访存 - `void ReleaseVertexLock(vid: size_t)-> cython.void`:对点vid解锁,所有线程均可访存该锁对应的点数据 TuGraph提供了两个批处理操作来并行地进行以点为中心的批处理过程,在Python中与C++使用方法稍有不同。 ```python # 函数名称:ProcessVertexInRange[ReducedSum, Algorithm]( # work: (algo: Algorithm, vi: size_t)-> ReducedSum, # lower: size_t, upper: size_t, # algo: Algorithm, # zero: ReducedSum = 0, # reduce: (a: ReducedSum, b: ReducedSum)-> ReducedSum = reduce_plus[ReducedSum]) # # 函数用途:对Graph中节点编号介于lower和upper之间的节点执行work函数。第四个参数表示累加的基数,默认为0; # 第五个参数表示对每个work处理后的节点返回值进行迭代reduce函数操作,默认为累加操作。 # 具体实现请参考include/lgraph/olap_base.h中具体代码 # # 使用示例:统计数组parent数组中有出边的点个数 import cython from cython.cimports.olap_base import * @cython.cclass class CountCore: graph: cython. pointer(OlapBase[Empty]) parent: ParallelVector[size_t] @cython.cfunc @cython.nogil def Work(self, vi: size_t) -> size_t: if self.graph.OutDegree(self.parent[vi]) > 0: return 1 return 0 def run(self, pointer_g: cython. pointer(OlapBase[Empty])): self.graph = pointer_g self.parent = self.graph.AllocVertexArray[size_t]() vertex_num: size_t vertex_num = self.graph.ProcessVertexInRange[size_t, CountCore](self.Work, 0, self.parent.Size(), self) print("the number is", vertex_num) if __name__ == "__main__": count_core = CountCore() count_core.run(cython.address(g)) ``` 其中g为图类OlapBase的实例化对象 ```python # 函数名称:ProcessVertexActive[ReducedSum, Algorithm]( # work: (algo: Algorithm, vi: size_t)-> ReducedSum, # active: ParallelBitset, # algo: Algorithm, # zero: ReducedSum = 0, # reduce: (a: ReducedSum, b: ReducedSum)-> ReducedSum = reduce_plus[ReducedSum]) # # 函数用途:对active_vertices中对应为1的节点执行work函数,第三个参数表示累加的基数,默认为0; # 第四个参数表示对每个work处理后的节点返回值进行迭代reduce函数操作,默认为累加操作。 # 具体实现请参考/include/lgraph/olap_base.h中具体代码 # # 使用示例:输出Graph中节点1,2,3的所有出度邻居,并统计这三个节点的总出度 import cython from cython.cimports.olap_base import * from cython.cimports.libc.stdio import printf @cython.cclass class NeighborCore: graph: cython.pointer(OlapBase[Empty]) active_in: ParallelBitset @cython.cfunc @cython.nogil def Work(self, vi: size_t) -> size_t: degree = self.graph.OutDegree(vi) dst: size_t edges = self.graph.OutEdges(vi) local_out_degree: size_t for i in range(degree): dst = edges[i].neighbour printf("node %lu has neighbour %lu\n", vi, dst) local_out_degree += 1 return local_out_degree def run(self, pointer_g: cython.pointer(OlapBase[Empty])): self.graph = pointer_g self.active_in = self.graph.AllocVertexSubset() self. active_in. Add(1) self. active_in. Add(2) self. active_in. Add(3) total_outdegree = cython.declare( size_t, self.graph.ProcessVertexActive[size_t, CountCore](self.Work, self.active_in, self)) printf("total outdegree of node1,2,3 is %lu\n",total_outdegree) if __name__ == "__main__": neighbor_core = NeighborCore() neighbor_core.run(cython.address(g)) ``` 如上面两个例子所展示,在Python中ProcessVertexActive与ProcessVertexInRange比在C++中额外需要一个算法类指针参数,Work函数一般也作为该算法类的成员函数,满足Work函数访问成员变量的需求(如图graph,点数组parent),在调用批处理函数时将Work函数和算法类的self指针传入批处理函数。 其中Work函数会在多线程中调用,因此加上修饰器`@cython.nogil`释放Python全局解释锁,在多线程执行的代码中(例如批处理函数中的Work函数,`cython.parallel.prange`中),不能包含Python对象,最好通过`dst: type`或者`dst = cython.declare(type)`的方式声明变量为C/C++类型。 ### 图类OlapOnDB: 并行化创建有向图: ```python olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL) ``` 并行化创建无向图 ```python olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_UNDIRECTED) ``` ID_MAPPING创建有向图 ```python olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_IDMAPPING) ``` ### 图类OlapOnDisk #### ConfigBase: - `ConfigBase()`: 构造函数 - `std::string input_dir`: 图边表数据路径 - `std::string output_dir`: 输出结果路径 - `Load(config: ConfigBase[EdgeData], edge_direction_policy: EdgeDirectionPolicy)-> void`: 读入图数据 ## 5. lgraph_db API 见procedures/algo_cython/lgraph_db.pxd与lgraph_db_python.py文件。 lgraph_db.pxd中接口用法与功能基本与C++接口相同,lgraph_db.pxd中声明的接口都由C++实现,在py文件中必须通过`from cython.cimports.olap_base import *`的方式导入,由Cython编译py文件后才能运行。 ### VertexIndexIterator - `GetVid()-> int64_t`: 获取点的vid ### Galaxy - `Galaxy(dir_path: std::string)`: 构造函数,dir_path为db路径 - `SetCurrentUser(user: std::string, password: std::string)-> cython.void`: 设置用户 - `SetUser(user: std::string)-> cython.void`: 设置用户 - `OpenGraph(graph: std::string, read_only: bint)-> GraphDB`: 创建GraphDB ### GraphDB: - `CreateReadTxn()-> Transaction`: 创建只读事务 - `CreateWriteTxn()-> Transaction`: 创建写事务 - `ForkTxn(txn: Transaction)-> Transaction`: 复制事务,只能复制读事务 ### Transaction: ``` GetVertexIndexIterator( label: std::string, field: std::string, key_start: std::string, key_end: std::string)-> VertexIndexIterator ``` 获取索引迭代器。迭代器的field值为 [key_start, key_end]。所以在key_start=key_end=v时,返回指向field值为v的点的迭代器 lgraph_db_python.py是lgraph_db.pxd中C++类 Galaxy与GraphDB的包装,将C++类包装为Python类,将lgraph_db_python.py编译为Python拓展后,可以直接在Python文件或Python命令行中`import lgraph_db_python`访问lgraph_db_python.PyGraphDB与PyGraphDB.PyGalaxy。 ### PyGalaxy: - `PyGalaxy(self, dir_path: str)`: 构造函数,dir_path为db路径 - `SetCurrentUser(self, user: str password: str)-> void`: 设置用户 - `SetUser(self, user: std::string)-> void`: 设置用户 - `OpenGraph(self, graph: str, read_only: bool)-> PyGraphDB`: 创建PyGraphDB ### PyGraphDB: - `get_pointer(self)-> cython.Py_ssize_t`: C++ 类GraphDB的地址 ## 6. 算法插件示例 下面为Python实现的BFS算法的代码示例: ```python # cython: language_level=3, cpp_locals=True, boundscheck=False, wraparound=False, initializedcheck=False # distutils: language = c++ # 注释作用如下: # language_level=3: 使用Python3 # cpp_locals=True: 需要c++17,使用std::optional管理Python代码中的C++对象,可以避免C++对象的拷贝构造 # boundscheck=False: 关闭索引的边界检查 # wraparound=False: 关闭负数下标的处理(类似Python List) # initializedcheck=False: 关闭检查内存是否初始化,关闭检查后运行性能更快 # language = c++: 将此py文件翻译为C++而不是C文件,TuGraph使用大量模板函数,所以都应该使用C++ import json import cython from cython.cimports.olap_base import * from cython.cimports.lgraph_db import * # 从procedures/algo_cython/ 中cimportolap_base.pxd与lgraph_db.pxd, 类似C++中#include "xxx.h" from cython.cimports.libc.stdio import printf # 类似C++中#include # 其他常见的还有cython.cimports.libcpp.unordered_map等 import time @cython.cclass # cython.cclass 表示BFSCore为C类型的Class class BFSCore: graph: cython.pointer(OlapBase[Empty]) # cython.pointer(OlapBase[Empty])表示OlapBase[Empty]的指针,类似C++中OlapBase[Empty]* # cython提供了常见类型的指针,如cython.p_int, cython.p_char等,表示int*, char*, ... parent: ParallelVector[size_t] active_in: ParallelBitset active_out: ParallelBitset root: size_t # root: size_t 声明root为C++ size_t类型变量,等效于root = cython.declare(size_t) # 不声明类型的变量为Python object类型 # 声明变量类型会大幅提高性能,同时在多线程部分,只有C/C++类型的变量可以访问 @cython.cfunc # cython.cfunc 表示Work为C类型的函数,参数与返回值应声明 # cfunc性能好,能接受C/C++对象为参数、返回值,但是不能在其他python文件中调用 # 类似的有cython.ccall,如Standalone函数,可以在其他python文件中调用 @cython.nogil # cython.nogil 表示释放Python全局解释锁,在nogil修饰的部分,不能访问Python对象 # 在多线程部分,都应有nogil修饰器 @cython.exceptval(check=False) # cython.exceptval(check=False) 表示禁用异常传播,将忽略函数内部引发的Python异常 def Work(self, vi: size_t) -> size_t: degree = cython.declare(size_t, self.graph.OutDegree(vi)) out_edges = cython.declare(AdjList[Empty], self.graph.OutEdges(vi)) i = cython.declare(size_t, 0) local_num_activations = cython.declare(size_t, 0) dst: size_t for i in range(degree): dst = out_edges[i].neighbour if self.parent[dst] == cython.cast(size_t, -1): # parent[dst] == -1 表示dst没有被bfs访问过 if self.active_out.Add(dst): # 将dst设置为为活跃节点;ParallelBitmap.Add为原子操作,防止重复计算 self.parent[dst] = vi local_num_activations += 1 return local_num_activations @cython.cfunc @cython.nogil @cython.exceptval(check=False) def run(self, g: cython.pointer(OlapBase[Empty]), r: size_t) -> cython.size_t: self.graph = g self.root = r self.active_in = g.AllocVertexSubset() self.active_out = g.AllocVertexSubset() self.parent = g.AllocVertexArray[size_t]() self.parent.Fill(-1) num_vertices = cython.declare(size_t, self.graph.NumVertices()) printf("num_vertices = %lu\n", num_vertices) self.parent[self.root] = self.root num_activations = cython.declare(size_t, 1) discovered_vertices = cython.declare(size_t, num_activations) self.active_in.Add(self.root) while num_activations > 0: self.active_out.Clear() num_activations = g.ProcessVertexActive[size_t, BFSCore](self.Work, self.active_in, self) discovered_vertices += num_activations self.active_out.Swap(self.active_in) printf("num_activations = %lu\n", num_activations) return discovered_vertices @cython.cfunc def procedure_process(db: cython.pointer(GraphDB), request: dict, response: dict) -> cython.bint: cost = time.time() root_id = "0" label = "node" field = "id" if "root" in request: root_id = request["root"] if "label" in request: label = request["label"] if "field" in request: field = request["field"] txn = db.CreateReadTxn() olapondb = OlapOnDB[Empty](db[0], txn, SNAPSHOT_PARALLEL) # 并行创建OlapOnDB # Cython不支持如 *db 的解引用操作,通过db[0]来解引用 root_vid = txn.GetVertexIndexIterator( label.encode('utf-8'), field.encode('utf-8'), root_id.encode('utf-8'), root_id.encode('utf-8') ).GetVid() # 通过 GetVertexIndexIterator 根据root节点label名和filed名与filed值(root_id) # 获取root节点的迭代器,通过迭代器获取vid,在无ID_MAPPING时,该vid与OlapOnDB中的id相同 cost = time.time() - cost printf("prepare_cost = %lf s\n", cython.cast(cython.double, cost)) a = BFSCore() cost = time.time() count = a.run(cython.address(olapondb), root_vid) cost = time.time() - cost printf("core_cost = %lf s\n", cython.cast(cython.double, cost)) response["found_vertices"] = count response["num_vertices"] = olapondb.NumVertices() response["num_edges"] = olapondb.NumEdges() return True @cython.ccall def Process(db: lgraph_db_python.PyGraphDB, inp: bytes): # Process为embed模式和procedure模式下插件入口,用cython.ccall修饰 # Process函数必须名为Process,参数为lgraph_db_python.PyGraphDB与bytes # 返回值必须为(bool, str) _inp = inp.decode("utf-8") request = json.loads(_inp) response = {} addr = cython.declare(cython.Py_ssize_t, db.get_pointer()) # 获取PyGraphDB中GraphDB对象的地址,转换为指针后传递 procedure_process(cython.cast(cython.pointer(GraphDB), addr), request, response) return (True, json.dumps(response)) ```