## rpc实现
```cpp
#ifndef RPC_H
#define RPC_H
#include <iostream>
#include <map>
#include <functional>
#include <tuple>
#include <memory>
#include <zmq.hpp>
#include "serializer.h"
template<typename T>
struct type_xx { typedef T type; };
template<>
struct type_xx<void> { typedef int8_t type; };
// 打包帮助模板
template<typename Tuple, std::size_t... Index>
void package_Args_impl(Serializer& sr, const Tuple& t, std::index_sequence<Index...>)
{
((sr << std::get<Index>(t)), ...);
}
template<typename... Args>
void package_Args(Serializer& sr, const std::tuple<Args...>& t)
{
package_Args_impl(sr, t, std::index_sequence_for<Args...>{});
}
// 用tuple做参数调用函数模板类
template<typename Function, typename Tuple, std::size_t... Index>
decltype(auto) invoke_impl(Function&& func, Tuple&& t, std::index_sequence<Index...>)
{
return func(std::get<Index>(std::forward<Tuple>(t))...);
}
template<typename Function, typename Tuple>
decltype(auto) invoke(Function&& func, Tuple&& t)
{
constexpr auto size = std::tuple_size<typename std::decay<Tuple>::type>::value;
return invoke_impl(std::forward<Function>(func), std::forward<Tuple>(t), std::make_index_sequence<size>{});
}
template<typename R, typename F, typename ArgsTuple>
typename std::enable_if<!std::is_same<R, void>::value, typename type_xx<R>::type >::type
call_helper(F f, ArgsTuple args)
{
return invoke(f, args);
}
class rpc {
public:
enum rpc_role {
RPC_CLIENT,
RPC_SERVER
};
enum rpc_errcode {
RPC_ERR_SUCCESS = 0,
RPC_ERR_FUNCTION_NOT_REGUIST,
RPC_ERR_RECV_TIMEOUT
};
template <typename T>
class response_t {
public:
response_t() : code_(0) { msg_.clear(); }
int error_code() const { return code_; }
std::string error_msg() const { return msg_; }
T val() const { return val_; }
void set_val(const T& val) { val_ = val; }
void set_code(int code) { code_ = code; }
void set_msg(const std::string& msg) { msg_ = msg; }
T& val() { return val_; }
int code() { return code_; }
std::string msg() { return msg_; }
friend Serializer& operator >> (Serializer& in, response_t<T>& d)
{
in >> d.code_ >> d.msg_;
if (d.code_ == 0)
in >> d.val_;
return in;
}
friend Serializer& operator << (Serializer& out, response_t<T>& d)
{
out << d.code_ << d.msg_ << d.val_;
return out;
}
private:
int code_;
std::string msg_;
T val_;
};
public:
rpc() : context_(1) {}
~rpc() { context_.close(); }
void as_client(const std::string& ip, int port)
{
role_ = RPC_CLIENT;
socket_ = std::unique_ptr<zmq::socket_t,
std::function<void(zmq::socket_t*)>>(new zmq::socket_t(context_, ZMQ_REQ),
[](zmq::socket_t* sock) { sock->close(); delete sock; sock = nullptr; });
std::ostringstream os;
os << "tcp://" << ip << ":" << port;
socket_->connect(os.str());
}
void as_server(int port)
{
role_ = RPC_SERVER;
socket_ = std::unique_ptr<zmq::socket_t,
std::function<void(zmq::socket_t*)>>(new zmq::socket_t(context_, ZMQ_REP),
[](zmq::socket_t* sock) { sock->close(); delete sock; sock = nullptr; });
std::ostringstream os;
os << "tcp://*:" << port;
std::cout << os.str() << std::endl;
socket_->bind(os.str());
}
bool send(zmq::message_t& data) { return socket_->send(data); }
bool recv(zmq::message_t& data) { return socket_->recv(&data); }
void set_timeout(uint32_t ms)
{
if (role_ == RPC_CLIENT)
socket_->setsockopt(ZMQ_RCVTIMEO, ms);
}
void run()
{
while (1)
{
zmq::message_t data;
recv(data);
StreamBuffer sb((char*)data.data(), data.size());
Serializer sr(sb);
std::string funcname;
sr >> funcname;
std::shared_ptr<Serializer> rptr = call_(funcname, sr.data(), sr.size());
zmq::message_t response(rptr.get()->size());
memcpy(response.data(), rptr.get()->data(), rptr.get()->size());
send(response);
}
}
std::shared_ptr<Serializer> call_(const std::string& name, const char* data, int len)
{
std::shared_ptr<Serializer> sp = std::make_shared<Serializer>();
if (mapFunctions_.find(name) == mapFunctions_.end())
{
*sp.get() << static_cast<int>(RPC_ERR_FUNCTION_NOT_REGUIST);
*sp.get() << std::string("function not bind: " + name);
return sp;
}
auto func = mapFunctions_[name];
func(sp.get(), data, len);
sp.get()->reset();
return sp;
}
template <typename R, typename... Args>
response_t<R> call(const std::string& name, Args... args)
{
using args_type = std::tuple<typename std::decay<Args>::type...>;
args_type as = std::make_tuple(args...);
Serializer sr;
sr << name;
package_Args(sr, as);
return net_call<R>(sr);
}
template <typename F>
void regist(const std::string& name, F func)
{
mapFunctions_[name] = std::bind(&rpc::callproxy<F>, this, func, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}
template<typename F, typename S>
void regist(const std::string& name, F func, S* s)
{
mapFunctions_[name] = std::bind(&rpc::callproxy<F, S>, this, func, s, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}
template<typename F>
void callproxy(F func, Serializer* pr, const char* data, int len )
{
callproxy_(func, pr, data, len);
}
template<typename F, typename S>
void callproxy(F func, S * s, Serializer * pr, const char * data, int len)
{
callproxy_(func, s, pr, data, len);
}
// 函数指针
template<typename R, typename... Args>
void callproxy_(R(*func)(Args...), Serializer* pr, const char* data, int len) {
callproxy_(std::function<R(Args...)>(func), pr, data, len);
}
template<typename R, typename C, typename S, typename... Args>
void callproxy_(R(C::* func)(Args...), S* s, Serializer* pr, const char* data, int len) {
using args_type = std::tuple<typename std::decay<Args>::type...>;
Serializer sr(StreamBuffer(data, len));
args_type as = sr.get_tuple<args_type>(std::index_sequence_for<Args...>{});
auto ff = [=](Args... args)->R {
return (s->*func)(args...);
};
typename type_xx<R>::type r = call_helper<R>(ff, as);
response_t<R> response;
response.set_code(RPC_ERR_SUCCESS);
response.set_val(r);
(*pr) << response;
}
// functional
template<typename R, typename... Args>
void callproxy_(std::function<R(Args... args)> func, Serializer* pr, const char* data, int len)
{
using args_type = std::tuple<typename std::decay<Args>::type...>;
Serializer sr(StreamBuffer(data, len));
args_type as = sr.get_tuple<args_type> (std::index_sequence_for<Args...>{});
typename type_xx<R>::type r = call_helper<R>(func, as);
response_t<R> response;
response.set_code(RPC_ERR_SUCCESS);
response.set_val(r);
(*pr) << response;
}
template<typename R>
response_t<R> net_call(Serializer& sr)
{
zmq::message_t request(sr.size() + 1);
memcpy(request.data(), sr.data(), sr.size());
send(request);
zmq::message_t reply;
recv(reply);
response_t<R> val;
if (reply.size() == 0)
{
val.set_code(RPC_ERR_RECV_TIMEOUT);
val.set_msg("recv timeout");
return val;
}
sr.clear();
sr.input((char *)reply.data(), reply.size());
sr >> val;
return val;
}
private:
std::map<std::string, std::function<void(Serializer*, const char*, int)>> mapFunctions_;
zmq::context_t context_;
std::unique_ptr<zmq::socket_t, std::function<void(zmq::socket_t*)>> socket_;
int role_;
};
#endif
```
- 空白目录
- 算法
- 排序
- 冒泡排序
- 选择排序
- 插入排序
- 归并排序
- 快速排序
- 计数排序
- 桶排序
- 基数排序
- 希尔排序
- 堆排序
- 二分查找
- 最小堆
- 最小索引堆
- 平衡二叉树(AVL tree)
- bitmap位图
- 布隆过滤器
- hashmap
- topK
- 跳表
- LRU Cache
- kmp
- 最小堆和堆排序
- 最短路径
- C++
- 运行时类型判断RTTI
- C++反射
- 手动实现智能指针
- 序列化实现
- rpc实现
- std::forward
- 函数指针的妙用
- C/C++
- std::function
- 同步队列
- 线程池实现
- std::promise
- 深入理解虚函数
- extern "C" 关键字讲解
- 大端小端的区别
- 简历
- 简历1
- redis
- 数据结构和对象
- sds
- list
- zskiplist
- 腾讯云redis面试题总结
- redis集群部署
- LeetCode
- 目标
- go基础
- 算法快速入门
- 数据结构篇
- 二叉树
- 链表
- 栈和队列
- 二进制
- 基础算法篇
- 二分搜索
- 排序算法
- 动态规划
- 算法思维
- 递归思维
- 滑动窗口思想
- 二叉搜索树
- 回溯法
- 其他
- 剑指offer
- 笔记
- git代理加速
- Linux
- vim大法
- vscode远程不能跳转
- cmake
- 设计模式
- 单例模式
- 简单工厂模式
- 外观模式
- 适配器模式
- 工厂方法模式
- 抽象工厂模式
- 生成器模式
- 原型模式
- 中介者模式
- 观察者模式
- 访问者模式
- 命令模式
- 网络编程
- epoll reactor模式
- linux timerfd系列函数总结
- IO
- mapreduce
- 反射器
- leo通信库
- Mutex
- Condition
- thread
- raft
- 协程
- hook
- 定时器
- 别人的面试经验
- 面试题
- vector崩溃问题
- JAVA
- Linux java环境配置
- ucore
- lab1
- FreeNOS
- leveldb
- 刷题笔记
- 回文串
- 前缀树
- 字符串查找
- 查找两个字符串a,b中的最长公共子串
- 动态规划
- golang
- 顺序循环打印实现
- 数据结构
- rpc运用
- python
- 单例
- 深拷贝浅拷贝
- 链表
- python基础题
- mysql
- 事务
- Linux
- 共享内存
- 刷题记录
- 贪心算法
- 动态规划
- 面试
- 腾讯C++面试
- 微众面试JD
- 迅雷网络面试
- 学习网址
- rabbitMq
