grpc-go
grpc Server
本文简单阅读源代码,了解grpc server的执行流程,从建立连接,到处理一条请求的过程。
使用方式
使用方式很简单,生成pb,注册建立服务,就可以等待请求了
type Hello struct{
}
func (h *Hello) Say(ctx context.Context, request pb.HelloRequest)(*pb.HelloResponse, error){
fmt.Println(request.Msg)
return &pb.HelloResponse{Msg: "wwww"}, nil
}
func main(){
lis, _ := net.Listen("tcp", "127.0.0.1:8888")
//1. 创建一个grpc服务器对象
gRpcServer := grpc.NewServer()
//2. 注册pb函数
pb.RegisterHelloServiceServer(gRpcServer, &Hello{})
//3. 开启服务端
//阻塞
gRpcServer.Serve(lis)
}
建立grpc server流程
NewServer
NewServer进行创建一个grpc服务,初始化一些参数。还可以进行函数选项模式
,来传递初始化的配置。
默认情况下会建立一个以下参数的grpc服务:
- 接受数据最大4M
- 发送数据最大2g
- 连接超时120秒
- 读和写缓存1mb
- 默认一个请求一个goroutine
numServerWorkers
numServerWorkers设定了开启多少个工作协程,如果没设置,则来了一条消息就会处理创建一个goroutine。
如果设置了,会将请求消息
进行分发给这多个worker
func (s *Server) serveStreams(st transport.ServerTransport) {
st.HandleStreams(func(stream *transport.Stream) {
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
default:
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
} else {
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}
})
}
注册函数
利用反射,将具体实现的结构体和与之对应的函数存储到 grpcServer的services
变量中
- key: 结构体名称 (一般在pb文件里会根据proto生成)
- value: 函数信息(调用函数的指针,函数名称,Metadata)
将函数信息存储后,来了一个请求根据请求信息,找到指定的函数,进行调用
type ServiceDesc struct {
ServiceName string //服务名称
HandlerType interface{} //结构体类型
Methods []MethodDesc//一元函数
Streams []StreamDesc//流函数
Metadata interface{}// 元数据
}
//注册服务函数
//args: sd 文件描述, srv: 具体实现的结构体
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
if ss != nil {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
}
s.register(sd, ss)
}
//利用反射将服务注册到字典
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.services[sd.ServiceName]; ok {
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
info := &serviceInfo{
serviceImpl: ss,
methods: make(map[string]*MethodDesc),
streams: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
info.methods[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
info.streams[d.StreamName] = d
}
s.services[sd.ServiceName] = info
}
监听
当客户端建立连接,会为其单独创建一个goroutine进行后续的数据传输
func Serve(lis net.Listener) error{
//...
for {
rawConn, err := lis.Accept()
go func() {
s.handleRawConn(rawConn)
}()
}
//...
}
处理一个grpc请求的流程
- 建立连接
- 创建goroutine处理连接
- grpc基于http2,根据tcp连接信息 创建http2 传输结构 newHTTP2Transport
- 创建新的goroutine,将http2传输信息 进行处理
- 经过http2_server.go:455中的处理,从tcp层读取数据进行解析,最后执行此处传递过去的函数指针
- 根据请求信息,找到指定函数。最后进行调用注册的应用层业务
- 找到字典里对应的执行函数
- 如果没找到,则判断unknownStreamDesc 执行,这个一般用来自定义路由
- 判断一元,还是流
- 执行函数
- 将结果写回给对方
- 移除连接
code
func Serve(lis net.Listener) error{
for {
//1. 建立连接
rawConn, err := lis.Accept()
go func() {
//2. 创建goroutine处理连接
s.handleRawConn(rawConn)
}()
}
}
func (s *Server) handleRawConn(rawConn net.Conn) {
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
// 3. grpc基于http2,根据tcp连接信息 创建http2 传输结构
st := s.newHTTP2Transport(conn, authInfo)
go func() {
//4. 创建新的goroutine,将http2传输信息 进行处理
s.serveStreams(st)
//7. 移除连接
s.removeConn(st)
}()
}
func (s *Server) serveStreams(st transport.ServerTransport) {
//5. 经过http2_server.go:455中的处理,从tcp层读取数据进行解析,最后执行此处传递过去的函数指针
st.HandleStreams(func(stream *transport.Stream) {
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
default:
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
} else {
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}
})
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
//6. 根据请求信息,找到指定函数。最后进行调用注册的应用层业务
srv, knownService := s.services[service]
if knownService {
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
// 此处可以进行自定义的路由
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
...
}
解析请求头数据的细节
- 读取底层tcp数据,最后进行解析
- 解析头数据
- 根据解析后的头数据,进行一系列的设置
- 设置:超时的ctx
- 设置:metadata 存入context中
- 执行函数指针
http2从tcp将数据报转换成http2认识的具体数据。之后grpc将http2的数据封装成grpc用到的stream结构中,还有一些参数timeout
、content-type
等封装到stream中的ctx中,到这里为止还没有对具体的请求数据做任何操作。
type decodeState struct {
serverSide bool //用了http2的解析,就一定是true
data parsedHeaderData//请求过来的关键参数
}
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
for {
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
//1. 读取底层tcp数据,最后进行解析
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
}
}
}
//对解码后的报头进行操作
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
//2. 解析头数据
if h2code, err := state.decodeHeader(frame); err != nil {
return false
}
//grpc层的数据
s := &Stream{
recvCompress: state.data.encoding,
method: state.data.method,
contentSubtype: state.data.contentSubtype,
}
//一个关键的ctx包含很多机制
s.ctx
//3. 根据解析后的头数据,进行一系列的设置
//4. 设置:超时的ctx
if state.data.timeoutSet {
s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
//address
pr := &peer.Peer{
Addr: t.remoteAddr,
}
s.ctx = peer.NewContext(s.ctx, pr)
//5. 设置:metadata 存入context中
if len(state.data.mdata) > 0 {
s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
}
//6. 执行函数指针
handle(s)
return false
}
func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) (http2.ErrCode, error) {
...
for _, hf := range frame.Fields {
d.processHeaderField(hf)
}
...
}
func (d *decodeState) processHeaderField(f hpack.HeaderField) {
switch f.Name {
case "content-type":
//如果类型 不包含 `application/grpc` 则抛异常,可以是`application/grpc;xxxx`等
contentSubtype, validContentType := grpcutil.ContentSubtype(f.Value)
if !validContentType {
d.data.contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", f.Value)
return
}
d.data.contentSubtype = contentSubtype
d.addMetadata(f.Name, f.Value)
d.data.isGRPC = true
case ":path":
//依靠它找到需要调用的函数 比如/pb.TspService/Hello
d.data.method = f.Value
case "grpc-timeout":
//如果有超时设置,会创建ctx context.WithTimeout(t.ctx, state.data.timeout)
d.data.timeoutSet = true
var err error
if d.data.timeout, err = decodeTimeout(f.Value); err != nil {
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
}
default:
//自定义的metadata在这里处理
d.addMetadata(f.Name, v)
}
}
解析请求数据的细节
-
利用反射注册的函数,进行调用,参数传递,传递解析方式,但不会调用
-
调用到pb里注册的函数,在_TspService_Hello_Handler中进行具体处理
- 解析请求信息
- 调用拦截器
- 拦截器过滤后,进行最终的函数调用
-
源码
//最终解析在这里
func _TspService_Hello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HelloRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TspServiceServer).Hello(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.TspService/Hello",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TspServiceServer).Hello(ctx, req.(*HelloRequest))
}
return interceptor(ctx, in, info, handler)
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
df := func(v interface{}) error {
//根据content-type 获取解析器,进行解析
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
if sh != nil {
sh.HandleRPC(stream.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: v,
WireLength: payInfo.wireLength + headerLen,
Data: d,
Length: len(d),
})
}
if binlog != nil {
binlog.Log(&binarylog.ClientMessage{
Message: d,
})
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
return nil
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
//利用反射进行函数调用
//info.serviceImpl函数, ctx函数的第一个参数(metadata等信息),df请求数据(protobuf的解析)
//s.opts.unaryInt 拦截器
reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
opts := &transport.Options{Last: true}
//发送结果给请求方
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
}
return err
}
Metadata
Metadata是在一次 RPC 调用过程中关于这次调用的信息。 是 key-value的形式。其中 key 是 string 类型, value是[]string。
Metadata 对于 gRPC 本身来说透明, 它使得 client 和 server 能为对方提供本次调用的信息。就像一次 http 请求的 RequestHeader 和 ResponseHeader,http header 的生命周期是一次 http 请求, Metadata 的生命周期则是一次 RPC 调用
//grpc的结构
type MD map[string][]string
使用
- 发送方
dd := metadata.Pairs("hello", "world")
ctx = metadata.NewOutgoingContext(ctx, dd)
r, err := c.Hello(ctx, &pb.HelloRequest{Msg: "fff"})
- 接收方
func (s *server) Hello(c context.Context, p *pb.HelloRequest) (*pb.HelloResponse, error) {
md, _ := metadata.FromIncomingContext(c)
fmt.Println(md.Get("hello"))
}
注意事项
metadata本意是用来描述调用的信息的:协议的格式、调用方的请求方式、参数、非业务相关信息等。数据相关的不要用metadata进行存储。这样可以在不进行解析传输数据的情况下,依靠metadata进行一些逻辑处理,比如根据metatdata判断数据解析的方式、一些中间服务根据metadata信息,进行面向服务的操作。
拦截器
拦截器(Interceptor) 类似于 HTTP 应用的中间件(Middleware),能够让你在真正调用 RPC 方法前,进行身份认证、日志、限流、异常捕获、参数校验等通用操作,和 Python 的装饰器(Decorator) 的作用基本相同。
客户端发起请求前做一些验证,服务端处理消息前做过滤 grpc服务端和客户端都提供了interceptor功能
- client:发起请求前做统一处理
- server:收到请求,进入具体执行函数之前,对请求做统一处理
调用方式类似链表、调用一个后再调用下一个节点
grpc源码
func NewServer(opt ...ServerOption) *Server {
//..
chainUnaryServerInterceptors(s)
chainStreamServerInterceptors(s)
//..
return s
}
func chainUnaryServerInterceptors(s *Server) {
if s.opts.unaryInt != nil {
interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
}
var chainedInt UnaryServerInterceptor
if len(interceptors) == 0 {
chainedInt = nil
} else if len(interceptors) == 1 {
chainedInt = interceptors[0]
} else {
chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
}
}
s.opts.unaryInt = chainedInt
}