目录

grpc-go

grpc Server

本文简单阅读源代码,了解grpc server的执行流程,从建立连接,到处理一条请求的过程。

使用方式

使用方式很简单,生成pb,注册建立服务,就可以等待请求了

type Hello struct{
}

func (h *Hello) Say(ctx context.Context, request pb.HelloRequest)(*pb.HelloResponse, error){
    fmt.Println(request.Msg)
    return &pb.HelloResponse{Msg: "wwww"}, nil
}

func main(){
    lis, _ := net.Listen("tcp", "127.0.0.1:8888")
    //1. 创建一个grpc服务器对象
    gRpcServer := grpc.NewServer()
    //2. 注册pb函数
    pb.RegisterHelloServiceServer(gRpcServer, &Hello{})
    //3. 开启服务端
    //阻塞
    gRpcServer.Serve(lis)
}

建立grpc server流程

NewServer

NewServer进行创建一个grpc服务,初始化一些参数。还可以进行函数选项模式,来传递初始化的配置。 默认情况下会建立一个以下参数的grpc服务:

  1. 接受数据最大4M
  2. 发送数据最大2g
  3. 连接超时120秒
  4. 读和写缓存1mb
  5. 默认一个请求一个goroutine

numServerWorkers

numServerWorkers设定了开启多少个工作协程,如果没设置,则来了一条消息就会处理创建一个goroutine。 如果设置了,会将请求消息进行分发给这多个worker

func (s *Server) serveStreams(st transport.ServerTransport) {
	st.HandleStreams(func(stream *transport.Stream) {
		if s.opts.numServerWorkers > 0 {
			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
			select {
			case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
			default:
				go func() {
					s.handleStream(st, stream, s.traceInfo(st, stream))
					wg.Done()
				}()
			}
		} else {
			go func() {
				defer wg.Done()
				s.handleStream(st, stream, s.traceInfo(st, stream))
			}()
		}
	})
}

注册函数

利用反射,将具体实现的结构体和与之对应的函数存储到 grpcServer的services变量中

  • key: 结构体名称 (一般在pb文件里会根据proto生成)
  • value: 函数信息(调用函数的指针,函数名称,Metadata)

将函数信息存储后,来了一个请求根据请求信息,找到指定的函数,进行调用

type ServiceDesc struct {
	ServiceName string //服务名称
	HandlerType interface{} //结构体类型
	Methods     []MethodDesc//一元函数
	Streams     []StreamDesc//流函数
	Metadata    interface{}// 元数据
}

//注册服务函数
//args: sd 文件描述, srv: 具体实现的结构体
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
	if ss != nil {
		ht := reflect.TypeOf(sd.HandlerType).Elem()
		st := reflect.TypeOf(ss)
		if !st.Implements(ht) {
			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
		}
	}
	s.register(sd, ss)
}

//利用反射将服务注册到字典
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.printf("RegisterService(%q)", sd.ServiceName)
	if s.serve {
		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
	}
	if _, ok := s.services[sd.ServiceName]; ok {
		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
	}
	info := &serviceInfo{
		serviceImpl: ss,
		methods:     make(map[string]*MethodDesc),
		streams:     make(map[string]*StreamDesc),
		mdata:       sd.Metadata,
	}
	for i := range sd.Methods {
		d := &sd.Methods[i]
		info.methods[d.MethodName] = d
	}
	for i := range sd.Streams {
		d := &sd.Streams[i]
		info.streams[d.StreamName] = d
	}
	s.services[sd.ServiceName] = info
}

监听

当客户端建立连接,会为其单独创建一个goroutine进行后续的数据传输

func Serve(lis net.Listener) error{
    //...
	for {
		rawConn, err := lis.Accept()
		go func() {
			s.handleRawConn(rawConn)
		}()
    }
    //...
}

处理一个grpc请求的流程

  1. 建立连接
  2. 创建goroutine处理连接
  3. grpc基于http2,根据tcp连接信息 创建http2 传输结构 newHTTP2Transport
  4. 创建新的goroutine,将http2传输信息 进行处理
  5. 经过http2_server.go:455中的处理,从tcp层读取数据进行解析,最后执行此处传递过去的函数指针
  6. 根据请求信息,找到指定函数。最后进行调用注册的应用层业务
    1. 找到字典里对应的执行函数
    2. 如果没找到,则判断unknownStreamDesc 执行,这个一般用来自定义路由
    3. 判断一元,还是流
    4. 执行函数
    5. 将结果写回给对方
  7. 移除连接

code

func Serve(lis net.Listener) error{
	for {
        //1. 建立连接
		rawConn, err := lis.Accept()
		go func() {
            //2. 创建goroutine处理连接
			s.handleRawConn(rawConn)
		}()
    }
}

func (s *Server) handleRawConn(rawConn net.Conn) {
	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
	// 3. grpc基于http2,根据tcp连接信息 创建http2 传输结构
	st := s.newHTTP2Transport(conn, authInfo)
	go func() {
        //4. 创建新的goroutine,将http2传输信息 进行处理
        s.serveStreams(st)
        //7. 移除连接
		s.removeConn(st)
	}()
}

func (s *Server) serveStreams(st transport.ServerTransport) {
    //5. 经过http2_server.go:455中的处理,从tcp层读取数据进行解析,最后执行此处传递过去的函数指针
	st.HandleStreams(func(stream *transport.Stream) {
		if s.opts.numServerWorkers > 0 {
			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
			select {
			case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
			default:
				go func() {
					s.handleStream(st, stream, s.traceInfo(st, stream))
					wg.Done()
				}()
			}
		} else {
			go func() {
				defer wg.Done()
				s.handleStream(st, stream, s.traceInfo(st, stream))
			}()
		}
	})
}

func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
    //6. 根据请求信息,找到指定函数。最后进行调用注册的应用层业务
	srv, knownService := s.services[service]
	if knownService {
		if md, ok := srv.methods[method]; ok {
			s.processUnaryRPC(t, stream, srv, md, trInfo)
			return
		}
		if sd, ok := srv.streams[method]; ok {
			s.processStreamingRPC(t, stream, srv, sd, trInfo)
			return
		}
	}
	// 此处可以进行自定义的路由
	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
		return
    }
    ...
}

解析请求头数据的细节

  1. 读取底层tcp数据,最后进行解析
  2. 解析头数据
  3. 根据解析后的头数据,进行一系列的设置
    1. 设置:超时的ctx
    2. 设置:metadata 存入context中
  4. 执行函数指针

http2从tcp将数据报转换成http2认识的具体数据。之后grpc将http2的数据封装成grpc用到的stream结构中,还有一些参数timeoutcontent-type等封装到stream中的ctx中,到这里为止还没有对具体的请求数据做任何操作。

type decodeState struct {
	serverSide bool //用了http2的解析,就一定是true
	data parsedHeaderData//请求过来的关键参数
}

func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
	for {
		switch frame := frame.(type) {
        case *http2.MetaHeadersFrame:
            //1. 读取底层tcp数据,最后进行解析
			if t.operateHeaders(frame, handle, traceCtx) {
				t.Close()
				break
			}
		case *http2.DataFrame:
			t.handleData(frame)
		case *http2.RSTStreamFrame:
			t.handleRSTStream(frame)
		case *http2.SettingsFrame:
			t.handleSettings(frame)
		case *http2.PingFrame:
			t.handlePing(frame)
		case *http2.WindowUpdateFrame:
			t.handleWindowUpdate(frame)
		case *http2.GoAwayFrame:
			// TODO: Handle GoAway from the client appropriately.
		default:
			if logger.V(logLevel) {
				logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
			}
		}
	}
}

//对解码后的报头进行操作
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
    //2. 解析头数据
	if h2code, err := state.decodeHeader(frame); err != nil {
		return false
    }
    //grpc层的数据
    s := &Stream{
		recvCompress:   state.data.encoding,
		method:         state.data.method,
		contentSubtype: state.data.contentSubtype,
    }
    //一个关键的ctx包含很多机制
    s.ctx

    //3. 根据解析后的头数据,进行一系列的设置
    //4. 设置:超时的ctx
	if state.data.timeoutSet {
		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
	} else {
		s.ctx, s.cancel = context.WithCancel(t.ctx)
    }
    
    //address
    pr := &peer.Peer{
		Addr: t.remoteAddr,
	}
	s.ctx = peer.NewContext(s.ctx, pr)
	//5. 设置:metadata 存入context中
	if len(state.data.mdata) > 0 {
		s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
	}
	
	//6. 执行函数指针
	handle(s)
	return false
}


func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) (http2.ErrCode, error) {
    ...
    for _, hf := range frame.Fields {
		d.processHeaderField(hf)
    }
    ...
}


func (d *decodeState) processHeaderField(f hpack.HeaderField) {
	switch f.Name {
    case "content-type":
        //如果类型 不包含 `application/grpc`  则抛异常,可以是`application/grpc;xxxx`等
		contentSubtype, validContentType := grpcutil.ContentSubtype(f.Value)
		if !validContentType {
			d.data.contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", f.Value)
			return
		}
		d.data.contentSubtype = contentSubtype
		d.addMetadata(f.Name, f.Value)
        d.data.isGRPC = true
    case ":path":
        //依靠它找到需要调用的函数 比如/pb.TspService/Hello
		d.data.method = f.Value
    case "grpc-timeout":
        //如果有超时设置,会创建ctx context.WithTimeout(t.ctx, state.data.timeout)
		d.data.timeoutSet = true
		var err error
		if d.data.timeout, err = decodeTimeout(f.Value); err != nil {
			d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
		}
	default:
        //自定义的metadata在这里处理
		d.addMetadata(f.Name, v)
	}
}

解析请求数据的细节

  1. 利用反射注册的函数,进行调用,参数传递,传递解析方式,但不会调用

  2. 调用到pb里注册的函数,在_TspService_Hello_Handler中进行具体处理

    1. 解析请求信息
    2. 调用拦截器
    3. 拦截器过滤后,进行最终的函数调用
  3. 源码

//最终解析在这里
func _TspService_Hello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(HelloRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(TspServiceServer).Hello(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/pb.TspService/Hello",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(TspServiceServer).Hello(ctx, req.(*HelloRequest))
	}
	return interceptor(ctx, in, info, handler)
}

func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
	df := func(v interface{}) error {
        //根据content-type 获取解析器,进行解析
		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
		}
		if sh != nil {
			sh.HandleRPC(stream.Context(), &stats.InPayload{
				RecvTime:   time.Now(),
				Payload:    v,
				WireLength: payInfo.wireLength + headerLen,
				Data:       d,
				Length:     len(d),
			})
		}
		if binlog != nil {
			binlog.Log(&binarylog.ClientMessage{
				Message: d,
			})
		}
		if trInfo != nil {
			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
		}
		return nil
    }
 
    ctx := NewContextWithServerTransportStream(stream.Context(), stream)
    //利用反射进行函数调用
    //info.serviceImpl函数, ctx函数的第一个参数(metadata等信息),df请求数据(protobuf的解析)
    //s.opts.unaryInt 拦截器
	reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)

	opts := &transport.Options{Last: true}

    //发送结果给请求方
	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
	
	}

	return err
}

Metadata

Metadata是在一次 RPC 调用过程中关于这次调用的信息。 是 key-value的形式。其中 key 是 string 类型, value是[]string。

Metadata 对于 gRPC 本身来说透明, 它使得 client 和 server 能为对方提供本次调用的信息。就像一次 http 请求的 RequestHeader 和 ResponseHeader,http header 的生命周期是一次 http 请求, Metadata 的生命周期则是一次 RPC 调用

//grpc的结构
type MD map[string][]string

使用

  • 发送方
    dd := metadata.Pairs("hello", "world")
    ctx = metadata.NewOutgoingContext(ctx, dd)
    r, err := c.Hello(ctx, &pb.HelloRequest{Msg: "fff"})
  • 接收方
func (s *server) Hello(c context.Context, p *pb.HelloRequest) (*pb.HelloResponse, error) {
	md, _ := metadata.FromIncomingContext(c)
        fmt.Println(md.Get("hello"))
}

注意事项

metadata本意是用来描述调用的信息的:协议的格式、调用方的请求方式、参数、非业务相关信息等。数据相关的不要用metadata进行存储。这样可以在不进行解析传输数据的情况下,依靠metadata进行一些逻辑处理,比如根据metatdata判断数据解析的方式、一些中间服务根据metadata信息,进行面向服务的操作。

拦截器

拦截器(Interceptor) 类似于 HTTP 应用的中间件(Middleware),能够让你在真正调用 RPC 方法前,进行身份认证、日志、限流、异常捕获、参数校验等通用操作,和 Python 的装饰器(Decorator) 的作用基本相同。

客户端发起请求前做一些验证,服务端处理消息前做过滤 grpc服务端和客户端都提供了interceptor功能

  • client:发起请求前做统一处理
  • server:收到请求,进入具体执行函数之前,对请求做统一处理

调用方式类似链表、调用一个后再调用下一个节点

grpc源码

func NewServer(opt ...ServerOption) *Server {
    //..
	chainUnaryServerInterceptors(s)
	chainStreamServerInterceptors(s)
	//..
	return s
}

func chainUnaryServerInterceptors(s *Server) {
	if s.opts.unaryInt != nil {
		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
	}

	var chainedInt UnaryServerInterceptor
	if len(interceptors) == 0 {
		chainedInt = nil
	} else if len(interceptors) == 1 {
		chainedInt = interceptors[0]
	} else {
		chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
			return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
		}
	}

	s.opts.unaryInt = chainedInt
}