RPC API

此文档主要介绍 TuGraph 的 RPC API 的调用详情。

1.简介

TuGraph 提供丰富的 RPC API,以供开发者通过 RPC 请求远程调用 TuGraph 提供的服务。

RPC(远程过程调用)是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。 相比REST,RPC 面向方法,主要用于函数方法的调用,可以适合更复杂通信需求的场景,且性能更高。 brpc是用c++语言编写的工业级RPC框架,基于brpc,TuGraph 提供了丰富的RPC API,本文档描述 TuGraph 的 RPC API 使用方式。

2.请求

2.1.建立连接

开发者向TuGraph服务发送RPC请求,首先要建立连接。以C++语言为例,开发者创建指定url的通道(channel), 由通道创建指定的服务存根(LGraphRPCService_Stub),后续即可通过存根像调用本地方法一样向远程 服务器发送请求。

    std::shared_ptr<lgraph_rpc::m_channel_options> options = std::make_shared<lgraph_rpc::m_channel_options>();
    options->protocol = "baidu_std";
    options->connection_type = "";
    options->timeout_ms = 60 * 60 * 1000 /*milliseconds*/;
    options->max_retry = 3;
    std::string load_balancer = "";
    std::shared_ptr<lgraph_rpc::m_channel> channel = std::make_shared<lgraph_rpc::m_channel>();
    if (channel->Init(url.c_str(), load_balancer, options.get()) != 0)
        throw RpcException("Fail to initialize channel");
    LGraphRPCService_Stub stub(channel.get());

2.2.请求类型

TuGraph支持10种RPC请求,其中每种请求的功能如下表所示:

请求

功能

GraphApiRequest

点边索引操作请求

CypherRequest

cypher请求

PluginRequest

存储过程请求

HARequest

高可用模式请求

ImportRequest

数据导入请求

GraphRequest

子图操作请求

AclRequest

权限管理请求

ConfigRequest

配置管理请求

RestoreRequest

备份请求

SchemaRequest

schema管理请求

用户发送请求时,需要传入以下参数:

  • client_version: 可选参数,HA模式下可通过对比client_versionserver_version防止响应过时的请求

  • token: 必要参数,客户端登陆之后获得token,每次请求传入token以校验用户身份

  • is_write_op: 可选参数,标志请求是否是写请求

  • user: 可选参数,HA模式下主从之间同步请求时设置user,不需验证token

服务处理完RPC请求之后发回响应,响应消息中除了包含每个请求的单独响应信息之外,还包含以下参数:

  • error_code: 必要参数,标志请求处理状态

  • redirect: 可选参数,HA模式下向follower发送写请求时处理失败,设置redirect为请求转发地址,即leader地址

  • error: 可选参数,请求错误信息

  • server_version: 可选参数,HA模式的请求响应中设置server_version以避免client读取数据时发生反向时间旅行问题

:warning: 除CypherRequest、PluginRequest、HARequest和AclRequest外,其余RPC接口将逐步废弃,其功能统一至CypherRequest接口。

3.登录

登录请求信息包含以下参数:

  • user: 必要参数,用户名

  • pass: 必要参数,密码 以C++为例,用户使用构建好的服务存根发送登录请求:

    auto* req = request.mutable_acl_request();
    auto* auth = req->mutable_auth_request()->mutable_login();
    auth->set_user(user);
    auth->set_password(pass);
    // send data
    cntl->Reset();
    cntl->request_attachment().append(FLAGS_attachment);
    req->set_client_version(server_version);
    req->set_token(token);
    LGraphRPCService_Stub stub(channel.get());
    LGraphResponse res;
    stub.HandleRequest(cntl.get(), req, &resp, nullptr);
    if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
    server_version = std::max(server_version, res.server_version());
    if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());
    token = res.acl_response().auth_response().token();

登录响应信息包含以下参数:

  • token: 必要参数,登录成功会收到带有签名的令牌,即 Json Web Token,客户端储存该令牌,并且用于以后的每次发送请求。 如果登录失败会收到“Authentication failed”错误。

4.查询

用户可以通过Cypher查询和TuGraph进行绝大多数的交互,Cypher请求信息包含以下参数:

  • query: 必要参数,Cypher查询语句

  • param_names: 可选参数,参数名

  • param_values: 可选参数,参数值

  • result_in_json_format: 必要参数,查询结果是否以JSON格式返回

  • graph: 可选参数,Cypher语句执行的子图名称

  • timeout: 可选参数,Cypher语句执行的超时时间

以C++为例,用户发送Cypher请求的方式如下所示:

    LGraphResponse res;
    cntl->Reset();
    cntl->request_attachment().append(FLAGS_attachment);
    LGraphRequest req;
    req.set_client_version(server_version);
    req.set_token(token);
    lgraph::CypherRequest* cypher_req = req.mutable_cypher_request();
    cypher_req->set_graph(graph);
    cypher_req->set_query(query);
    cypher_req->set_timeout(timeout);
    cypher_req->set_result_in_json_format(true);
    LGraphRPCService_Stub stub(channel.get());
    stub.HandleRequest(cntl.get(), &req, &res, nullptr);
    if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
    if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());
    server_version = std::max(server_version, res.server_version());
    CypherResponse cypher_res = res.cypher_response();

Cypher请求响应为以下两个参数之一:

  • json_result: JSON格式的cypher查询结果

  • binary_result: CypherResult格式的cypher查询结果

5.存储过程

为满足用户较为复杂的查询/更新逻辑,TuGraph支持 C 语言和 Python 语言编写的存储过程。 用户可以使用RPC请求对存储过程进行增删改查操作。

5.1.加载存储过程

加载存储过程的请求包含以下参数:

  • name: 必要参数,存储过程名称

  • read_only: 必要参数,是否只读

  • code: 必要参数,存储过程文件读入生成的ByteString

  • desc: 可选参数,存储过程描述

  • code_type: 可选参数,存储过程代码类型,PY、SO、CPP、ZIP四者之一

以C++为例,用户加载存储过程的方式如下所示:

    std::string content;
    if (!FieldSpecSerializer::FileReader(source_file, content)) {
        std::swap(content, result);
        return false;
    }
    LGraphRequest req;
    req.set_is_write_op(true);
    lgraph::PluginRequest* pluginRequest = req.mutable_plugin_request();
    pluginRequest->set_graph(graph);
    pluginRequest->set_type(procedure_type == "CPP" ? lgraph::PluginRequest::CPP
                                                    : lgraph::PluginRequest::PYTHON);
    pluginRequest->set_version(version);
    lgraph::LoadPluginRequest* loadPluginRequest = pluginRequest->mutable_load_plugin_request();
    loadPluginRequest->set_code_type([](const std::string& type) {
        std::unordered_map<std::string, lgraph::LoadPluginRequest_CodeType> um{
            {"SO", lgraph::LoadPluginRequest::SO},
            {"PY", lgraph::LoadPluginRequest::PY},
            {"ZIP", lgraph::LoadPluginRequest::ZIP},
            {"CPP", lgraph::LoadPluginRequest::CPP}};
        return um[type];
    }(code_type));
    loadPluginRequest->set_name(procedure_name);
    loadPluginRequest->set_desc(procedure_description);
    loadPluginRequest->set_read_only(read_only);
    loadPluginRequest->set_code(content);
    cntl->Reset();
    cntl->request_attachment().append(FLAGS_attachment);
    req.set_client_version(server_version);
    req.set_token(token);
    LGraphRPCService_Stub stub(channel.get());
    LGraphResponse res;
    stub.HandleRequest(cntl.get(), &req, &res, nullptr);
    if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
    server_version = std::max(server_version, res.server_version());
    if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());

加载存储过程的响应不包含参数,如果加载失败则抛出BadInput异常

5.2.调用存储过程

调用存储过程的请求包含以下参数:

  • name: 必要参数,存储过程名称

  • param: 必要参数,存储过程参数

  • result_in_json_format: 可选参数,调用结果是否以JSON格式返回

  • in_process: 可选参数,未来支持

  • timeout: 可选参数,调用存储过程的超时时间

以C++为例,用户调用存储过程的方式如下所示:

    LGraphRequest req;
    lgraph::PluginRequest* pluginRequest = req.mutable_plugin_request();
    pluginRequest->set_graph(graph);
    pluginRequest->set_type(procedure_type == "CPP" ? lgraph::PluginRequest::CPP
                                                    : lgraph::PluginRequest::PYTHON);
    lgraph::CallPluginRequest *cpRequest = pluginRequest->mutable_call_plugin_request();
    cpRequest->set_name(procedure_name);
    cpRequest->set_in_process(in_process);
    cpRequest->set_param(param);
    cpRequest->set_timeout(procedure_time_out);
    cpRequest->set_result_in_json_format(json_format);
    LGraphResponse res;
    cntl->Reset();
    cntl->request_attachment().append(FLAGS_attachment);
    req.set_client_version(server_version);
    req.set_token(token);
    LGraphRPCService_Stub stub(channel.get());
    stub.HandleRequest(cntl.get(), &req, &res, nullptr);
    if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
    server_version = std::max(server_version, res.server_version());
    if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());
    if (json_format) {
        result = res.mutable_plugin_response()->mutable_call_plugin_response()->json_result();
    } else {
        result = res.mutable_plugin_response()->mutable_call_plugin_response()->reply();
    }

调用存储过程的响应为以下两个参数之一:

  • reply: ByteString格式的存储过程调用结果

  • json_result: JSON格式的存储过程调用结果

5.3.删除存储过程

删除存储过程的请求包含以下参数:

  • name: 必要参数,存储过程名称

以C++为例,用户删除存储过程的方式如下所示:

    LGraphRequest req;
    req.set_is_write_op(true);
    lgraph::PluginRequest* pluginRequest = req.mutable_plugin_request();
    pluginRequest->set_graph(graph);
    pluginRequest->set_type(procedure_type == "CPP" ? lgraph::PluginRequest::CPP
                                                    : lgraph::PluginRequest::PYTHON);
    lgraph::DelPluginRequest* dpRequest = pluginRequest->mutable_del_plugin_request();
    dpRequest->set_name(procedure_name);
    cntl->Reset();
    cntl->request_attachment().append(FLAGS_attachment);
    req.set_client_version(server_version);
    req.set_token(token);
    LGraphRPCService_Stub stub(channel.get());
    LGraphResponse res;
    stub.HandleRequest(cntl.get(), &req, &res, nullptr);
    if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
    server_version = std::max(server_version, res.server_version());
    if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());

删除存储过程的响应不包含参数,如果删除失败则抛出BadInput异常

5.4.列举存储过程

列举存储过程请求不需要参数,以C++为例,用户列举存储过程的方式如下所示:

    LGraphRequest req;
    req.set_is_write_op(false);
    lgraph::PluginRequest* pluginRequest = req.mutable_plugin_request();
    pluginRequest->set_graph(graph);
    pluginRequest->set_type(procedure_type == "CPP" ? lgraph::PluginRequest::CPP
                                                    : lgraph::PluginRequest::PYTHON);
    pluginRequest->mutable_list_plugin_request();
    cntl->Reset();
    cntl->request_attachment().append(FLAGS_attachment);
    req.set_client_version(server_version);
    req.set_token(token);
    LGraphRPCService_Stub stub(channel.get());
    LGraphResponse res;
    stub.HandleRequest(cntl.get(), &req, &res, nullptr);
    if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
    server_version = std::max(server_version, res.server_version());
    if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());
    result = res.mutable_plugin_response()->mutable_list_plugin_response()->reply();

列举存储过程的响应的参数如下所示:

  • reply: JSON格式的procedure列表