skoo's notes

努力记录一些自己觉得有趣的东西...
28 August 2013
by skoo

Go语言标准库能够自带一个rpc框架还是非常给力的,这可以很大程度的降低写后端网络通信服务的门槛,特别是在大规模的分布式系统中,rpc基本是跨机器通信的标配。rpc能够最大程度屏蔽网络细节,让开发者专注在服务功能的开发上面。下面介绍Go语言rpc框架的客户端内部实现.

Go rpc客户端的逻辑很简单,大体上,就是将一个个的调用请求序列化后原子的发送给服务器,然后有一个专门的gorutine等待服务器应答,这个goroutine会将收到的每个应答分发给对应的请求,这就完成了一次rpc调用。

调用入口

func NewClient(conn io.ReadWriteCloser) *Client

func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error

使用rpc客户端,首先要NewClient得到一个客户端对象,然后通过这个客户端对象的Call方法去执行远程服务端方法。这里需要注意,NewClient函数不光是创建了一个客户端对象,同时还创建了一个input goroutine. input goroutine是阻塞在连接上,等待服务端的响应。这里Call方法的主要工作是将一个rpc请求发送到服务端,同时放入一个等待队列,等候服务器的响应到来。

发送请求

经过客户端Call方法提交的rpc的请求,最终将调用如下的send方法发送给服务器,这里重点分析一下send的实现。

func (client *Client) send(call *Call) {
	//
	// 入口处就来了一把锁,这把锁显然是为了保证一个请求能够被原子的写入socket中. 毕竟一个
	// rpc连接是会被多个goroutine并发写入的,因此这里需要保证发送请求的原子性。
	// 
	client.sending.Lock()
	defer client.sending.Unlock()

	// 
	// 这里又来了一把锁,这把锁的目标是客户端的pending等待队列,也就是将每个rpc请求放入等待队列
	// 的时候,需要对这个pending队列做并发写保护。
	//
	// Register this call.
	client.mutex.Lock()
	if client.shutdown || client.closing {
		call.Error = ErrShutdown
		client.mutex.Unlock()
		call.done()
		return
	}
	// 
	// pending队列其实是使用map实现的,这里可以看到每个rpc请求都会生存一个唯一递增的seq, 这个
	// seq就是用来标记请求的,这个很像tcp包的seq。
	seq := client.seq
	client.seq++
	client.pending[seq] = call
	client.mutex.Unlock()

	// 
	// 下面就是将一个rpc请求的所有数据 (请求方法名、seq、请求参数等)进行序列化打包,然后发送
	// 出去. 这里主要采用的是Go标准库自带的gob算法进行请求的序列化。
	// 
	// Encode and send the request.
	client.request.Seq = seq
	client.request.ServiceMethod = call.ServiceMethod
	err := client.codec.WriteRequest(&client.request, call.Args)
	if err != nil {
		client.mutex.Lock()
		call = client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()
		if call != nil {
			call.Error = err
			call.done()
		}
	}
}

通过对发送过程的分析,可以看出一个rpc请求发出去主要有三个过程,第一个过程是将请求放入等待队列,第二个过程就是序列化,最后一个就是写入socket.

send过程最重要的就是两把锁 + seq。复用连接的原子写、并发请求是实现rpc框架的基础。

读取应答

前面提到在创建一个rpc客户端对象的时候,同时会启动一个input goroutine来等待服务器的响应,下面的input方法就是这个goroutine做的所有工作。

func (client *Client) input() {
	var err error
	var response Response
	//
	// 这里的for循环就是永久负责这个连接的响应读取,只有在连接上发生错误后,才会退出。
	// 
	for err == nil {
		//
		// 首先是读响应头, 响应头一般有一个很重要的信息就是正文数据长度,有了这个长度信息,才知道
		// 读多少正文才是一个应答完毕。
		//
		response = Response{}
		err = client.codec.ReadResponseHeader(&response)
		if err != nil {
			break
		}
		// 
		// 这里是一个很重要的步骤,从响应头里取到seq,这个seq就是客户端生成的seq,在上文的send
		// 过程中发送给了服务器,服务器应答的时候,必须将这个seq响应给客户端。只有这样客户端才
		// 知道这个应答是对应pending队列中的那个请求的。
		// 
		// 这里对pending队列枷锁保护,通过seq提取对应的请求call对象。
		//
		seq := response.Seq
		client.mutex.Lock()
		call := client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()

		//
		// 这个switch里主要就是处理异常以及正常情况读取响应正文了。异常情况,英文解释很详细了。
		// 
		switch {
		case call == nil:
			// We've got no pending call. That usually means that
			// WriteRequest partially failed, and call was already
			// removed; response is a server telling us about an
			// error reading request body. We should still attempt
			// to read error body, but there's no one to give it to.
			err = client.codec.ReadResponseBody(nil)
			if err != nil {
				err = errors.New("reading error body: " + err.Error())
			}
		case response.Error != "":
			// We've got an error response. Give this to the request;
			// any subsequent requests will get the ReadResponseBody
			// error if there is one.
			call.Error = ServerError(response.Error)
			err = client.codec.ReadResponseBody(nil)
			if err != nil {
				err = errors.New("reading error body: " + err.Error())
			}
			call.done()
		default:
			//
			// 开始读取响应的正文,正文放到call中的Reply中去。
			//
			err = client.codec.ReadResponseBody(call.Reply)
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())
			}
			call.done()
		}
	}
	
	//
	// 下面部分的代码都是在处理连接上出错,以及服务端关闭连接等情况的清理工作. 这部分很重要,
	//  否则可能导致一些调用rpc的goroutine永久阻塞等待,不能恢复工作。
	// 
	// Terminate pending calls.
	client.sending.Lock()
	client.mutex.Lock()
	client.shutdown = true
	closing := client.closing
	if err == io.EOF {
		if closing {
			err = ErrShutdown
		} else {
			err = io.ErrUnexpectedEOF
		}
	}
	for _, call := range client.pending {
		call.Error = err
		call.done()
	}
	client.mutex.Unlock()
	client.sending.Unlock()
	if err != io.EOF && !closing {
		log.Println("rpc: client protocol error:", err)
	}
}

不管有多少goroutine在并发写一个rpc连接,等待在连接上读取应答的goroutine只有一个,这个goroutine负责读取连接上的所有响应,然后将响应分发给对应的请求,也就完成了一次rpc请求了。

自定义编码解码器

type ClientCodec interface {
	WriteRequest(*Request, interface{}) error
	ReadResponseHeader(*Response) error
	ReadResponseBody(interface{}) error

	Close() error
}

ClientCodec定义了编码解码器的接口,也就是说你如果要自定义一个编码解码器,只需要实现这个接口就可以了。

WriteRequest做的事情就是将rpc请求的远程方法名、参数等信息进行序列化打包,然后写入socket中。你完全可以按照自己的想法去打包一请求进行发送。 两个Read接口就是从socket读取响应数据包,然后采用相应的算法进行反序列化解包即可。

Go的rpc如果不能自定义编码解码器,那就只能用在客户端和服务器都是Go语言开发的环境中。这就大幅度的降低了其灵活性。能够自定义编码解码器后,理论上就可以用来实现其他rpc的协议,比如:thrift。github的go-thrift项目确实是在Go rpc的基础上通过自定义编码解码器实现的thrift协议。

timeout

Call调用并没有提供超时机制,如果服务器响应太慢,怎么办?死等下去吗?这很不合理。有人说可以自己实现Call方法,不使用rpc包默认提供的Call。代码如下:

func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
	select {
	case call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done:
		return call.Error
	case <-time.After(5*time.Second):
	}
	return errors.New("timeout")
}

上面这段代码确实可以做到rpc调用5s没有得到响应,就超时放弃等待应答。

但是,这样做会有另外一个潜在的风险,就是内存泄漏. 因为,client.Go这个方法会构造一个call对象,则个call对象其实就是一个请求,这个请求被发送出去的同时会放到一个pending队列中,上文已经分析过了。通过分析input goroutine的过程可以看到如果服务器不响应的话,这个call对象就一直待在pending队列中了,永远不会被删除掉。当然,这种情况说明我们的服务器设计有问题。

实现一个rpc客户端,最重要的事情就是连接复用,也就是说一个连接上需要并发的同时跑多个请求。Go rpc在完成这件事情上,动用了两把锁,这个地方很可能会是影响性能的点。待服务器实现分析完后,我们来动手测试一下rpc的性能, 看看究竟如何。