olap_base

This is an implementation of the TuGraph graph analytics engine. The graph analytics engine is a general-purpose processing engine useful for implementing various graph analytics algorithms such as PageRank, ShortestPath, etc..

Defines

THREAD_WORKING
THREAD_STEALING
VERTEX_BATCH_SIZE
WORD_OFFSET(i)
BIT_OFFSET(i)

Functions

union @4 __attribute__ ((packed))

Variables

size_t neighbour
EdgeData edge_data
size_t src
size_t dst
namespace lgraph_api
namespace olap

Enums

enum EdgeDirectionPolicy

Define the edge direction policy of graph The policy determines the graph symmetric and undirected feature.

Values:

enumerator DUAL_DIRECTION

The graph is asymmetric. The edges from input files are outgoing edges. The reversed edges form incoming edges.

enumerator MAKE_SYMMETRIC

The graph is symmetric but the input files are asymmetric. The outgoing and incoming edges are identical.

enumerator INPUT_SYMMETRIC

Both the graph and the input files are symmetric. The outgoing and incoming edges are identical.

Functions

template<typename EdgeData> struct lgraph_api::olap::AdjUnit __attribute__ ((packed))
template<> struct lgraph_api::olap::AdjUnit< Empty > __attribute__ ((packed))
template<typename ReducedSum>
static ReducedSum reduce_plus(ReducedSum a, ReducedSum b)

The default reduce function which uses the plus operator.

template<typename T>
T ForEachVertex(GraphDB &db, Transaction &txn, std::vector<Worker> &workers, const std::vector<int64_t> &vertices, std::function<void(Transaction&, VertexIterator&, T&)> work, std::function<void(const T&, T&)> reduce, size_t parallel_factor = 8)
template<typename T>
std::vector<T> ForEachVertex(GraphDB &db, Transaction &txn, std::vector<Worker> &workers, const std::vector<int64_t> &vertices, std::function<T(Transaction&, VertexIterator&, size_t)> work, size_t parallel_factor = 8)

Variables

struct lgraph_api::olap::EdgeStringUnit __attribute__
static constexpr size_t MAX_NUM_EDGES = 1ul << 36

The maximum number of edges. Change this value if needed.

template<typename EdgeData>
class AdjList
#include <olap_base.h>

Public Functions

inline AdjList()
inline AdjUnit<EdgeData> *begin()
inline AdjUnit<EdgeData> *end()
inline AdjUnit<EdgeData> &operator[](size_t i)

Private Functions

inline AdjList(AdjUnit<EdgeData> *begin, AdjUnit<EdgeData> *end)

Private Members

AdjUnit<EdgeData> *begin_
AdjUnit<EdgeData> *end_

Friends

friend class OlapBase< EdgeData >
template<typename EdgeData>
struct AdjUnit
#include <olap_base.h>

AdjUnit<EdgeData> represents an adjacent edge with EdgeData as the weight type.

模板参数

EdgeData – Type of the edge data.

Public Members

size_t neighbour
EdgeData edge_data
template<>
struct AdjUnit<Empty>
#include <olap_base.h>

Public Functions

template<> union lgraph_api::olap::AdjUnit< Empty >::@3 __attribute__ ((packed))

Public Members

size_t neighbour
Empty edge_data
template<typename EdgeData>
struct EdgeStringUnit
#include <olap_base.h>

EdgeStringUnit<EdgeData> represents an edge with EdgeData as the weight type, The vertex is of string type.

模板参数

EdgeData – Type of the edge data.

Public Members

std::string src
std::string dst
EdgeData edge_data
template<typename EdgeData>
struct EdgeUnit
#include <olap_base.h>

EdgeUnit<EdgeData> represents an edge with EdgeData as the weight type.

模板参数

EdgeData – Type of the edge data.

Public Members

size_t src
size_t dst
EdgeData edge_data
template<>
struct EdgeUnit<Empty>
#include <olap_base.h>

Public Functions

template<> union lgraph_api::olap::EdgeUnit< Empty >::@5 __attribute__ ((packed))

Public Members

size_t src
size_t dst
Empty edge_data
struct Empty
#include <olap_base.h>

Empty is used for representing unweighted graphs.

template<typename EdgeData>
class OlapBase
#include <olap_base.h>

AdjList<EdgeData> allows range-based for-loop over AdjUnit<EdgeData>.

Graph

EdgeData is used for representing edge weights (the default type is Empty which is used for unweighted graphs).

模板参数
  • EdgeData – Type of the edge data.

  • EdgeData – Graph instances represent static (sub)graphs loaded from txt file. The internal organization uses compressed sparse matrix formats which are optimized for read-only accesses.

  • EdgeData – Type of the edge data.

Subclassed by OlapOnDB< EdgeData >

Public Functions

inline OlapBase()

Constructor of Graph.

inline virtual bool CheckKillThisTask()
inline size_t OutDegree(size_t vid)

Access the out-degree of some vertex.

参数

vid – The vertex id (in the Graph) to access.

返回

The out-degree of the specified vertex in the Graph.

inline size_t InDegree(size_t vid)

Access the in-degree of some vertex.

参数

vid – The vertex id (in the Graph) to access.

返回

The in-degree of the specified vertex in the Graph.

inline AdjList<EdgeData> OutEdges(size_t vid)

Access the outgoing edges of some vertex.

参数

vid – The vertex id (in the Graph) to access.

返回

The outgoing edges of the specified vertex in the Graph.

inline AdjList<EdgeData> InEdges(size_t vid)

Access the incoming edges of some vertex.

参数

vid – The vertex id (in the Graph) to access.

返回

The incoming edges of the specified vertex in the Graph.

inline void Transpose()

Transpose the graph.

inline size_t NumVertices()

Get number of vertices of the Graph.

返回

Number of vertices of the Graph.

inline size_t NumEdges()

Get number of edges of the Graph.

返回

Number of edges of the Graph.

template<typename VertexData>
inline ParallelVector<VertexData> AllocVertexArray()

Allocate a vertex array with type VertexData.

模板参数

VertexData – Type of the vertex data.

返回

A ParallelVector with type VertexData.

inline ParallelBitset AllocVertexSubset()

Allocate a vertex subset represented with ParallelBitset.

返回

A ParallelBitset sized |V| of the Graph.

inline void AcquireVertexLock(size_t vid)

Lock some vertex to ensure correct concurrent updates.

参数

vid – The vertex id (in the Graph) to lock.

inline void ReleaseVertexLock(size_t vid)

Unlock some vertex to ensure correct concurrent updates.

参数

vid – The vertex id (in the Graph) to unlock.

inline VertexLockGuard GuardVertexLock(size_t vid)

Get a VertexLockGuard of some vertex.

参数

vid – The vertex id (in the Graph) to lock/unlock.

返回

A VertexLockGuard corresponding to the specified vertex.

inline bool IfSparse(ParallelBitset &active_vertices)

Judging whether it is sparse mode or dense mode according to the number of vertices.

参数

active_vertices – The ParallelBitset of active_vertices.

inline void set_num_vertices(size_t vertices)

Assign vertices to the first loaded graph.

参数

vertices – The vertex id (in the Graph) to lock/unlock.

inline void LoadFromArray(char *edge_array, size_t input_vertices, size_t input_edges, EdgeDirectionPolicy edge_direction_policy)

Load graph data from edge_array.

参数
  • edge_array[in] The data in this array is read into the graph.

  • input_vertices – The number of vertices in the input graph data.

  • input_edges – The number of edges in the input graph data.

  • edge_direction_policy – Graph data loading method.

template<typename ReducedSum>
inline ReducedSum ProcessVertexInRange(std::function<ReducedSum(size_t)> work, size_t lower, size_t upper, ReducedSum zero = 0, std::function<ReducedSum(ReducedSum, ReducedSum)> reduce = reduce_plus<ReducedSum>)

Execute a parallel-for loop in the range [lower, upper).

抛出

std::runtime_error – Raised when a runtime error condition occurs.

模板参数

ReducedSum – Type of the reduced sum.

参数
  • work – The function describing the work.

  • lower – The lower bound of the range (inclusive).

  • upper – The upper bound of the range (exclusive).

  • zero – (Optional) The initial value for reduction.

  • reduce – (Optional) The function describing the reduction logic.

返回

A reduction value.

template<typename ReducedSum, typename Algorithm>
inline ReducedSum ProcessVertexInRange(std::function<ReducedSum(Algorithm, size_t)> work, size_t lower, size_t upper, Algorithm algorithm, ReducedSum zero = 0, std::function<ReducedSum(ReducedSum, ReducedSum)> reduce = reduce_plus<ReducedSum>)
template<typename ReducedSum>
inline ReducedSum ProcessVertexActive(std::function<ReducedSum(size_t)> work, ParallelBitset &active_vertices, ReducedSum zero = 0, std::function<ReducedSum(ReducedSum, ReducedSum)> reduce = reduce_plus<ReducedSum>)

Process a set of active vertices in parallel.

抛出

std::runtime_error – Raised when a runtime error condition occurs.

模板参数

ReducedSum – Type of the reduced sum.

参数
  • work – The function describing each vertex’s work.

  • active_vertices[inout] The active vertex set.

  • zero – (Optional) The initial value for reduction.

  • reduce – (Optional) The function describing the reduction logic.

返回

A reduction value.

template<typename ReducedSum, typename Algorithm>
inline ReducedSum ProcessVertexActive(std::function<ReducedSum(Algorithm, size_t)> work, ParallelBitset &active_vertices, Algorithm algorithm, ReducedSum zero = 0, std::function<ReducedSum(ReducedSum, ReducedSum)> reduce = reduce_plus<ReducedSum>)

Protected Functions

inline virtual void Construct()

Protected Attributes

size_t num_vertices_
size_t num_edges_
size_t edge_data_size_
size_t adj_unit_size_
size_t edge_unit_size_
EdgeDirectionPolicy edge_direction_policy_
EdgeUnit<EdgeData> *edge_list_
ParallelVector<size_t> out_degree_
ParallelVector<size_t> in_degree_
ParallelVector<size_t> out_index_
ParallelVector<size_t> in_index_
ParallelVector<AdjUnit<EdgeData>> out_edges_
ParallelVector<AdjUnit<EdgeData>> in_edges_
ParallelVector<bool> lock_array_
class ParallelBitset
#include <olap_base.h>

ParallelBitset implements the concurrent bitset data structure, which is usually used to represent active vertex sets.

Public Functions

explicit ParallelBitset(size_t size)

Construct a ParallelBitset.

参数

size – The size of the bitset (i.e. the number of bits).

ParallelBitset(const ParallelBitset &rhs) = delete
inline ParallelBitset &operator=(ParallelBitset &&rhs)
inline ParallelBitset(ParallelBitset &&rhs)
inline ParallelBitset()
~ParallelBitset()
void Clear()

Clear the bitset.

void Fill()

Fill the bitset.

bool Has(size_t i)

Test a specified bit.

参数

i – The bit to test.

返回

Whether the bit is set or not.

bool Add(size_t i)

Set a specified bit.

参数

i – The bit to set.

返回

Whether the operation is a true addition or not.

void Swap(ParallelBitset &other)

Swap the current bitset with another one.

参数

other[inout] The other bitset to swap with.

inline uint64_t *Data()
inline size_t Size()

Private Members

uint64_t *data_
size_t size_
template<typename T>
class ParallelVector
#include <olap_base.h>

ParallelVector<T> aims to mimic std::vector<T>. Note that the deletions other than clearing are not supported.

Public Functions

inline explicit ParallelVector(size_t capacity)

Construct a ParallelVector<T>.

抛出

std::runtime_error – Raised when a runtime error condition occurs.

参数

capacity – The capacity of the vector.

inline explicit ParallelVector(size_t capacity, size_t size)

Construct a ParallelVector<T>.

抛出

std::runtime_error – Raised when a runtime error condition occurs.

参数
  • capacity – The capacity of the vector.

  • size – The initial size of the vector.

inline ParallelVector(T *data, size_t size)

Construct a ParallelVector<T>

参数
  • data – The initial data of the vector.

  • size – The initial size of the vector. And the initial capacity equals initial size.

ParallelVector(const ParallelVector<T> &rhs) = delete
inline ParallelVector(ParallelVector<T> &&rhs)
inline ParallelVector<T> &operator=(ParallelVector<T> &&rhs)
inline ParallelVector()

Default constructor of ParallelVector<T>.

inline void Destroy()

Destroy ParallelVector<T>.

inline ~ParallelVector()
inline T &operator[](size_t i)
inline T *begin()
inline T *end()
inline T &Back()
inline T *Data()
inline size_t Size()
inline void Resize(size_t size)

Change ParallelVector size. Note the new size should be larger than or equal to elder size.

参数

size – Value of new size.

inline void Resize(size_t size, const T &elem)

Change ParallelVector size and initialize the new element with elem. Note the new size should be larger than or equal to elder size.

参数
  • size – Value of new size.

  • elem – Initial value of new-added element.

inline void Clear()

Clear all data and change size to 0.

inline void ReAlloc(size_t capacity)

Destroy elder data and allocate with new capacity.

参数

capacity – New capacity value.

inline void Fill(T elem)

Assign the vector’s elements with a common value.

参数

elem – The common value.

     This action is performed in parallel, so you should not call
     it inside another parallel region (via Worker::Delegate).

inline void Append(const T &elem, bool atomic = true)

Append an element to the vector.

抛出

std::runtime_error – Raised when a runtime error condition occurs.

参数
  • elem – The element.

  • atomic – (Optional) Whether atomic instructions should be used or not.

inline void Append(T *buf, size_t count, bool atomic = true)

Append an array of elements to the vector.

抛出

std::runtime_error – Raised when a runtime error condition occurs.

参数
  • buf[inout] The array pointer.

  • count – The array length.

  • atomic – (Optional) True to atomic.

inline void Append(ParallelVector<T> &other, bool atomic = true)

Append another vector of elements to this.

抛出

std::runtime_error – Raised when a runtime error condition occurs.

参数
  • other[inout] The other vector.

  • atomic – (Optional) True to atomic.

inline void Swap(ParallelVector<T> &other)

Swap the current vector with another one.

参数

other[inout] The other vector to swap with.

inline ParallelVector<T> Copy()

Copy the current vector.

返回

A new vector with the same copied content.

Private Members

bool destroyed_
size_t capacity_
T *data_
size_t size_
struct ThreadState
#include <olap_base.h>

Public Members

size_t curr
size_t end
int state
class VertexLockGuard
#include <olap_base.h>

The VertexLockGuard automatically acquires the lock on construction and releases the lock on destruction.

Public Functions

explicit VertexLockGuard(volatile bool *lock)
VertexLockGuard(const VertexLockGuard &rhs) = delete
VertexLockGuard(VertexLockGuard &&rhs) = default
~VertexLockGuard()

Private Members

bool *lock_
class Worker
#include <olap_base.h>

All the parallel tasks should be delegated through Worker to prevent a huge number of threads being populated via OpenMP.

Public Functions

Worker()
~Worker()
void Delegate(const std::function<void()> &work)

Send some work to the Worker instance.

参数

work – The function describing the work.

             Exceptions can be thrown in the work function if necessary.
             Note that Delegate cannot be nested.

template<typename Compute>
inline void DelegateCompute(const std::function<void(Compute&)> &work, Compute &compute)

Public Static Functions

static std::shared_ptr<Worker> &SharedWorker()

Get the global (shared) worker.

返回

A shared pointer to the global Worker instance.

Private Members

bool stopping_
bool has_work_
std::mutex mutex_
std::condition_variable cv_
std::shared_ptr<std::packaged_task<void()>> task_
std::thread worker_