Python Olap API
This document mainly introduces the API usage of OlapBase OlapOnDB and OlapOnDisk in Python
Table of contents
1 Overview
This manual will introduce the simple configuration required to use the Python interface of the TuGraph graph computing system, and explain the TuGraph Python API in conjunction with the code. For details about the functions of class ParallelBitset, OlapBase, etc., see olap-base-api.md, olap-on-db-api.md and olap-on-disk-api.md.
2. Configuration requirements
If you want to use TuGraph to write and compile your own applications, the required configuration requirements are:
Linux operating system, currently running successfully on Ubuntu16.04.2 and Centos7 systems.
A compiler that supports C++17 requires GCC version 5.4.1 or later.
Cython, version 3.0a1 or above is required, and the tested version is 3.0.0a11
3. Cython
Cython is an efficient programming language that is a superset of Python. Cython can translate .py files into C/C++ codes and compile them into Python extension modules, which can be called through import in Python. In TuGraph, all Python procedures are compiled into Python extension modules by Cython and then used.
The main advantage of using Cython is that it combines the simplicity and ease of use of Python with the performance of C/C++. The TuGraph Python interface is implemented using Cython.
4. Olap API
The Olap API is used for graph computing and is implemented in C++. The usage and functions are basically the same as the C++ interface. To use the API in a Python file, the interfaces declared in procedures/algo_cython/olap_base.pxd must be imported using cython.cimports.olap_base import *. The Python file can only be run after being compiled by Cython.
Atomic operations
cas[T](ptr: cython.pointer(T), oldv: T, newv: T)-> cython.bint: If the value pointed to by ptr is equal to oldv, assign the value pointed to by ptr to newv and return True, otherwise return False.write_min[T](a: cython.pointer(T), b: T)->cython.bint: If b is smaller than the value pointed to by a, then assign the value pointed to by a to b and return true, otherwise return false.write_max[T](a: cython.pointer(T), b: T)->cython.bint: If b is greater than the value pointed to by a, then assign the value pointed to by a to b and return true, otherwise return false.write_add[T](a: cython.pointer(T), b: T)->cython.bint: Add the value of b to the value pointed to by a.write_sub[T](a: cython.pointer(T), b: T)->cython.bint: Subtract the value pointed to by a from the value of b.
Vertex collection class ParallelBitset
Size() -> size_t: Indicates the number of vertices in the Bitmap.ParallelBitset(size: size_t): Initializes size and data, where the length of data is (size >> 6) + 1.Clear() -> cython.void: Empties the collection.Fill() -> cython.void: Adds all vertices to the set.Has(i: size_t) -> cython.bint: Checks if vertex i is in the set.Add(i: size_t) -> cython.bint: Adds vertex i to the set.Swap(other: ParallelBitset) -> cython.void: Exchanges elements with another ParallelBitset set.
Vertex array class ParallelVector
ParallelVector[T](size_t capacity): Constructs a ParallelVector, where capacity is the initial capacity of the vertex array.operator[](i: size_t) -> T: Returns the data at index i.begin() -> cython.pointer(T): Returns the start pointer of ParallelVector.end() -> cython.pointer(T): Returns the end pointer of ParallelVector. The usage of begin() and end() is similar to the begin and end pointers of the vector container, and these two pointers can be used to sequentially access the array.Back() -> T: Returns the last data of ParallelVector.Data() -> cython.pointer(T): Indicates the data of the array itself.Destroy() -> cython.void: Clears the data in the ParallelVector array and deletes the array.Size() -> size_t: Indicates the number of data in ParallelVector.Resize(size: size_t) -> cython.void: Changes ParallelVector to size, which should be greater than or equal to the size before the change and less than capacity.Clear() -> cython.void: Clears the data in ParallelVector.ReAlloc(capacity: size_t) -> cython.void: Allocates a new capacity size to ParallelVector. If the array has data, it migrates the data to the new memory.Fill(elem: T) -> cython.void: Assigns elem to all data of ParallelVector.Append(elem: T, atomic: cython.bint = true) -> cython.void: Adds a data to the end of ParallelVector.Swap(other: ParallelVector[T]) -> cython.void: Exchanges data with another ParallelVector.Copy() -> ParallelVector[T]: Copies the current ParallelVector data and stores it in the copy array.
Custom Data Structure
Empty: A special data type whose content is empty.EdgeUnit[EdgeData]: Indicates the edge whose weight type is EdgeData, used to parse the input file, including three member variables:src: size_t: starting vertex of the edgedst: size_t: end of the edgeedge_data: EdgeData: edge weight
AdjUnit[EdgeData]: indicates the edge whose weight type is EdgeData, which is used in the batch calculation process and contains two member variables:neighbor: size_t: neighbor vertex of the edgeedge_data: EdgeData: edge weight
AdjList[EdgeData]: The adjacency list of vertices whose weight type is EdgeData, often used to represent the set of incoming and outgoing edges of vertices, including three member functions:begin()->cython.pointer(AdjUnit[T]): the starting pointer of the listend()->cython.pointer(AdjUnit[T]): the end pointer of the listoperator[](i: size_t)-> AdjUnit[EdgeData]: the data whose subscript is i
Graph class OlapBase
NumVertices()->size_t: get the number of verticesNumEdges()-> size_t: get the number of edgesOutDegree(vid: size_t)-> size_t: out-degree of vertex vidInDegree(vid: size_t)->size_t: in-degree of the vertex vidAllocVertexArray[VertexData]() ->ParallelVector[VertexData]: Allocates an array of type VertexData with size as the number of verticesAllocVertexSubset()-> ParallelBitset: Assigns a subset of ParallelBitsets to denote whether the state of all vertices is activatedOutEdges(vid: size_t)-> AdjList[EdgeData]: Get the list of all outgoing edges of vertex vInEdges(vid: size_t)-> AdjList[EdgeData]: Get the list of all incoming edges of vertex vTranspose()->cython.void: transpose of a directed graphLoadFromArray(edge_array: cython.p_char, input_vertices: size_t, input_edges: size_t, edge_direction_policy: EdgeDirectionPolicy): Loads the graph data from the array, contains four parameters, the meaning of which are respectively:edge_array: reads the data from the array into the graph. Normally, the array contains multiple edges.input_vertices: specifies the number of vertices read into the graph by the array.input_edges: specifies the number of edges that the array reads into the image.edge_direction_policy: indicates that the graph is directed or undirected. The graph can be divided into three modes: DUAL_DIRECTION, MAKE_SYMMETRIC, and INPUT_SYMMETRIC. For details, see ‘enum EdgeDirectionPolicy’ in the config.h file in the core folder.
AcquireVertexLock(vid: size_t)-> cython.void: locks a vertex vid and prohibits other threads from accessing the vertex data corresponding to this lockvoid ReleaseVertexLock(vid: size_t)-> cython.void: unlocks the vertex vid and all threads can access the vertex data corresponding to the lock
TuGraph provides two batch operations to perform point-centric batch processing in parallel, which is slightly different from C++ in Python.
# Function name: ProcessVertexInRange[ReducedSum, Algorithm](
# work: (algo: Algorithm, vi: size_t)-> ReducedSum,
# lower: size_t, upper: size_t,
# algo: Algorithm,
# zero: ReducedSum = 0,
# reduce: (a: ReducedSum, b: ReducedSum)-> ReducedSum = reduce_plus[ReducedSum])
#
# Function purpose: Executes the work function on nodes whose node numbers are between lower and upper in the Graph. The fourth parameter indicates the base of accumulation, which is 0 by default.
# The fifth parameter indicates that the iterative reduce function operation is performed on the node return value after each work process, and the default is the accumulation operation.
# For specific implementation, please refer to the specific code in include/lgraph/olap_base.h
#
# Example usage: Count the number of vertices with edges in the parent array.
import cython
from cython.cimports.olap_base import *
@cython.cclass
class CountCore:
graph: cython.pointer(OlapBase[Empty])
parent: ParallelVector[size_t]
@cython.cfunc
@cython.nogil
def Work(self, vi: size_t) -> size_t:
if self.graph.OutDegree(self.parent[vi]) > 0:
return 1
return 0
def run(self, pointer_g: cython.pointer(OlapBase[Empty])):
self.graph = pointer_g
self.parent = self.graph.AllocVertexArray[size_t]()
vertex_num: size_t
vertex_num = self.graph.ProcessVertexInRange[size_t, CountCore](self.Work, 0, self.parent.Size(), self)
print("the number is", vertex_num)
if __name__ == "__main__":
count_core = CountCore()
count_core.run(cython.address(g))
g is the instantiated object of graph class OlapBase
# Function name: ProcessVertexActive[ReducedSum, Algorithm](
# work: (algo: Algorithm, vi: size_t)-> ReducedSum,
# active: ParallelBitset,
# algo: Algorithm,
# zero: ReducedSum = 0,
# reduce: (a: ReducedSum, b: ReducedSum)-> ReducedSum = reduce_plus[ReducedSum])
#
# Function purpose: Execute the work function on the nodes corresponding to 1 in the active_vertices bitset. The third parameter indicates the base of accumulation, which is 0 by default;
# The fourth parameter indicates that the iterative reduce function operation is performed on the node return value after each work process, and the default is the accumulation operation.
# For specific implementation, please refer to the specific code in /include/lgraph/olap_base.h
#
# Usage example: Output all out-degree neighbors of nodes 1, 2, and 3 in the Graph, and count the total out-degree of these three nodes.
import cython
from cython.cimports.olap_base import *
from cython.cimports.libc.stdio import printf
@cython.cclass
class NeighborCore:
graph: cython.pointer(OlapBase[Empty])
active_in: ParallelBitset
@cython.cfunc
@cython.nogil
def Work(self, vi: size_t) -> size_t:
degree = self. graph. OutDegree(vi)
dst: size_t
edges = self. graph. OutEdges(vi)
local_out_degree: size_t
for i in range(degree):
dst = edges[i].neighbor
printf("node %lu has neighbor %lu\n", vi, dst)
local_out_degree += 1
return local_out_degree
def run(self, pointer_g: cython.pointer(OlapBase[Empty])):
self.graph = pointer_g
self.active_in = self.graph.AllocVertexSubset()
self. active_in. Add(1)
self. active_in. Add(2)
self. active_in. Add(3)
total_outdegree = cython.declare(
size_t,
self.graph.ProcessVertexActive[size_t, CountCore](self.Work, self.active_in, self))
printf("total outdegree of node1,2,3 is %lu\n",total_outdegree)
if __name__ == "__main__":
neighbor_core = NeighborCore()
neighbor_core.run(cython.address(g))
As shown in the above two examples, ProcessVertexActive and ProcessVertexInRange in Python require an additional algorithm class pointer parameter compared to their C++ counterparts. The Work function is generally used as a member function of the algorithm class to access member variables, such as in the graph and ParallelVector parent examples shown above. When calling the batch function, the Work function and the self pointer of the algorithm class are passed to the batch function.
The Work function will be called in multiple threads, so the @cython.nogil decorator is added to release the Python global interpretation lock. In code executed by multiple threads, such as the Work function in the batch function, variables are better declared as C/C++ types using dst: type or dst = cython.declare(type). This is because Python objects cannot be used in multi-threaded code.
Graph class OlapOnDB:
Parallelize to create a directed graph:
olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL)
Parallelize to create an undirected graph
olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_UNDIRECTED)
ID_MAPPING creates a directed graph
olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_IDMAPPING)
Graph class OlapOnDisk
ConfigBase:
ConfigBase(): Constructorstd::string input_dir: graph edge table data pathstd::string output_dir: output pathLoad(config: ConfigBase[EdgeData], edge_direction_policy: EdgeDirectionPolicy)-> void: read in graph data
5. lgraph_db API
Please refer to the files procedures/algo_cython/lgraph_db.pxd and lgraph_db_python.py for the lgraph_db API.
The usage and functions of the interface in lgraph_db.pxd are basically the same as the C++ interface. The interface declared in lgraph_db.pxd is implemented in C++. In the py file, it must be imported by cython.cimports.olap_db import * and compiled by the Cython file to run.
VertexIndexIterator
GetVid()-> int64_t: Get the vid of the vertex
Galaxy
Galaxy(dir_path: std::string): constructor, dir_path is the db pathSetCurrentUser(user: std::string, password: std::string)-> cython.void: set userSetUser(user: std::string)-> cython.void: set userOpenGraph(graph: std::string, read_only: bint)-> GraphDB: create GraphDB
GraphDB:
CreateReadTxn()-> Transaction: create a read-only transactionCreateWriteTxn()-> Transaction: create a write transactionForkTxn(txn: Transaction)-> Transaction: Copy transactions, only read transactions can be copied
Transaction:
GetVertexIndexIterator(
label: std::string,
field: std::string,
key_start: std::string,
key_end: std::string) -> VertexIndexIterator
Gets index iterator. The iterator has field value [key_start, key_end]. So key_start=key_end=v returns an iterator pointing to all vertexes that has field value v
lgraph_db_python.py packages the C++ classes Galaxy and GraphDB from lgraph_db.pxd as Python classes. After compiling lgraph_db_python.py into a Python extension, you can directly access it in a Python file or from the Python command line by importing it with import lgraph_db_python.
PyGalaxy:
PyGalaxy(self, dir_path: str): constructor, dir_path is the db pathSetCurrentUser(self, user: str password: str)-> void: set userSetUser(self, user: std::string)-> void: set userOpenGraph(self, graph: str, read_only: bool)-> PyGraphDB: create PyGraphDB
PyGraphDB:
get_pointer(self)->cython.Py_ssize_t: address of C++ class GraphDB
6. Algorithm plug-in example
The following is a code example of the BFS algorithm implemented in Python:
# cython: language_level=3, cpp_locals=True, boundscheck=False, wraparound=False, initializedcheck=False
# distutils: language = c++
# Comments work as follows:
# language_level=3: use Python3
# cpp_locals=True: C++17 is required, and std::optional is used to manage C++ objects in Python code, which can avoid copy construction of C++ objects
# boundscheck=False: Turn off bounds checking for indexes
# wraparound=False: Turn off the processing of negative subscripts (similar to Python List)
# initializedcheck=False: Turn off checking whether the memory is initialized, and the running performance will be faster after turning off the check
# language = c++: translate this .py file to C++ instead of C file. TuGraph uses a lot of template functions, so C++ should be used
import json
import cython
from cython.cimports.olap_base import *
from cython.cimports.lgraph_db import *
# From procedures/algo_cython/ cimportolap_base.pxd and lgraph_db.pxd, similar to #include "xxx.h" in C++
from cython.cimports.libc.stdio import printf
# Similar to #include <stdio.h> in C++
# Other common ones include cython.cimports.libcpp.unordered_map, etc.
import time
@cython.cclass
# cython.cclass indicates that BFSCore is a C-type Class
class BFSCore:
graph: cython.pointer(OlapBase[Empty])
# cython.pointer(OlapBase[Empty]) indicates the pointer of OlapBase[Empty], similar to OlapBase[Empty]* in C++
# Cython provides common types of pointers, such as cython.p_int, cython.p_char, etc.
parent: ParallelVector[size_t]
active_in: ParallelBitset
active_out: ParallelBitset
root: size_t
# root: size_t declares root as a C++ size_t type variable, equivalent to root = cython.declare(size_t)
# Variables that do not declare a type are Python object types
# Declaring variable types will greatly improve performance, and in the multi-threaded part, only C/C++ type variables can be accessed
@cython.cfunc
# cython.cfunc indicates that Work is a C-type function, and the parameters and return values should be declared
# cfunc has good performance and can accept C/C++ objects as parameters and return values, but it cannot be called in other python files
# Similar to cython.ccall, such as Standalone function, which can be called in other python files
@cython.nogil
# cython.nogil means to release the Python global interpretation lock. In the part modified by nogil, Python objects cannot be accessed
# In the multi-threaded part, there should be nogil decorator
@cython.exceptval(check=False)
# cython.exceptval(check=False) means that exception propagation is disabled, and Python exceptions raised inside the function will be ignored
def Work(self, vi: size_t) -> size_t:
degree = cython.declare(size_t, self.graph.OutDegree(vi))
out_edges = cython.declare(AdjList[Empty], self.graph.OutEdges(vi))
i = cython.declare(size_t, 0)
local_num_activations = cython.declare(size_t, 0)
dst: size_t
for i in range(degree):
dst = out_edges[i].neighbor
if self.parent[dst] == cython.cast(size_t, -1):
# parent[dst] == -1 means that dst has not been visited by bfs
if self.active_out.Add(dst):
# Set dst as an active node; ParallelBitmap.Add is an atomic operation to prevent double calculation
self.parent[dst] = vi
local_num_activations += 1
return local_num_activations
@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
def run(self, g: cython.pointer(OlapBase[Empty]), r: size_t) -> cython. size_t:
self. graph = g
self.root = r
self.active_in = g.AllocVertexSubset()
self.active_out = g.AllocVertexSubset()
self.parent = g.AllocVertexArray[size_t]()
self. parent. Fill(-1)
num_vertices = cython.declare(size_t, self.graph.NumVertices())
printf("num_vertices = %lu\n", num_vertices)
self.parent[self.root] = self.root
num_activations = cython.declare(size_t, 1)
discovered_vertices = cython.declare(size_t, num_activations)
self. active_in. Add(self. root)
while num_activations > 0:
self. active_out. Clear()
num_activations = g.ProcessVertexActive[size_t, BFSCore](self.Work, self.active_in, self)
discovered_vertices += num_activations
self. active_out. Swap(self. active_in)
printf("num_activations = %lu\n", num_activations)
return discovered_vertices
@cython.cfunc
def procedure_process(db: cython.pointer(GraphDB), request: dict, response: dict) -> cython.bint:
cost = time. time()
root_id = "0"
label = "node"
field = "id"
if "root" in request:
root_id = request["root"]
if "label" in request:
label = request["label"]
if "field" in request:
field = request["field"]
txn = db.CreateReadTxn()
olapondb = OlapOnDB[Empty](db[0], txn, SNAPSHOT_PARALLEL)
# Create OlapOnDB in parallel
# Cython does not support dereference operations such as *db, use db[0] to dereference
root_vid = txn.GetVertexIndexIterator(
label.encode('utf-8'), field.encode('utf-8'),
root_id.encode('utf-8'), root_id.encode('utf-8')
).GetVid()
# Get the iterator of the root node through GetVertexIndexIterator based on the root node label name, field name and field value (root_id),
# and get the vid through the iterator. When there is no ID_MAPPING, the vid is the same as the id in OlapOnDB
cost = time. time() - cost
printf("prepare_cost = %lf s\n", cython.cast(cython.double, cost))
a = BFSCore()
cost = time. time()
count = a. run(cython. address(olapondb), root_vid)
cost = time. time() - cost
printf("core_cost = %lf s\n", cython.cast(cython.double, cost))
response["found_vertices"] = count
response["num_vertices"] = olapondb. NumVertices()
response["num_edges"] = olapondb. NumEdges()
return True
@cython.ccall
def Process(db: lgraph_db_python.PyGraphDB, inp: bytes):
# Process is the plug-in entry in embed mode and procedure mode, modified with cython.ccall
# The Process function must be named Process, and the parameters are lgraph_db_python.PyGraphDB and bytes
# The return value must be (bool, str)
_inp = inp.decode("utf-8")
request = json.loads(_inp)
response = {}
addr = cython.declare(cython.Py_ssize_t, db.get_pointer())
# Get the address of the GraphDB object in the PyGraphDB, convert it to a pointer and pass it
procedure_process(cython.cast(cython.pointer(GraphDB), addr),
request, response)
return (True, json.dumps(response))