本文代码地址:
本文是7天用Go从零实现分布式缓存GeeCache
的第五篇。
- 注册节点(
Register Peers
),借助一致性哈希算法选择节点。 - 实现
HTTP
客户端,与远程节点的服务端通信,代码约90
行
1 流程回顾
我们在GeeCache 第二天中描述了 geecache
的流程。在这之前已经实现了流程⑴
和 ⑶
,今天实现流程 ⑵
,从远程节点获取缓存值。
我们进一步细化流程 ⑵
:
2 抽象 PeerPicker
day5-multi-nodes/geecache/peers.go
package geecache
// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
PickPeer(key string) (peer PeerGetter, ok bool)
}
// PeerGetter is the interface that must be implemented by a peer.
type PeerGetter interface {
Get(group string, key string) ([]byte, error)
}
- 在这里,抽象出
2
个接口,PeerPicker
的PickPeer()
方法用于根据传入的key
选择相应节点PeerGetter
。 - 接口
PeerGetter
的Get()
方法用于从对应group
查找缓存值。PeerGetter
就对应于上述流程中的HTTP
客户端。
3 节点选择与 HTTP 客户端
在 GeeCache 第三天 中我们为 HTTPPool
实现了服务端功能,通信不仅需要服务端还需要客户端,因此,我们接下来要为 HTTPPool
实现客户端的功能。
首先创建具体的HTTP
客户端类 httpGetter
,实现 PeerGetter
接口。
day5-multi-nodes/geecache/http.go
type httpGetter struct {
baseURL string
}
func (h *httpGetter) Get(group string, key string) ([]byte, error) {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(group),
url.QueryEscape(key),
)
res, err := http.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned: %v", res.Status)
}
bytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %v", err)
}
return bytes, nil
}
var _ PeerGetter = (*httpGetter)(nil)
baseURL
表示将要访问的远程节点的地址,例如http://example.com/_geecache/
。- 使用
http.Get()
方式获取返回值,并转换为[]bytes
类型。
第二步,为 HTTPPool
添加节点选择的功能。
const (
defaultBasePath = "/_geecache/"
defaultReplicas = 50
)
// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
// this peer's base URL, e.g. "https://example.net:8000"
self string
basePath string
mu sync.Mutex // guards peers and httpGetters
peers *consistenthash.Map
httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
}
- 新增成员变量
peers
,类型是一致性哈希算法的Map
,用来根据具体的key
选择节点。 - 新增成员变量
httpGetters
,映射远程节点与对应的httpGetter
。每一个远程节点对应一个httpGetter
,因为httpGetter
与远程节点的地址baseURL
有关。
第三步,实现 PeerPicker
接口。
// Set updates the pool's list of peers.
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(defaultReplicas, nil)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
}
}
// PickPeer picks a peer according to key
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if peer := p.peers.Get(key); peer != "" && peer != p.self {
p.Log("Pick peer %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}
var _ PeerPicker = (*HTTPPool)(nil)
Set()
方法实例化了一致性哈希算法,并且添加了传入的节点。- 并为每一个节点创建了一个
HTTP
客户端httpGetter
。 PickerPeer()
包装了一致性哈希算法的Get()
方法,根据具体的key
,选择节点,返回节点对应的HTTP
客户端。
至此,HTTPPool
既具备了提供 HTTP
服务的能力,也具备了根据具体的 key
,创建 HTTP
客户端从远程节点获取缓存值的能力。
4 实现主流程
最后,我们需要将上述新增的功能集成在主流程(geecache.go
)中。
day5-multi-nodes/geecache/geecache.go
// A Group is a cache namespace and associated data loaded spread over
type Group struct {
name string
getter Getter
mainCache cache
peers PeerPicker
}
// RegisterPeers registers a PeerPicker for choosing remote peer
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}
func (g *Group) load(key string) (value ByteView, err error) {
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok {
if value, err = g.getFromPeer(peer, key); err == nil {
return value, nil
}
log.Println("[GeeCache] Failed to get from peer", err)
}
}
return g.getLocally(key)
}
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{b: bytes}, nil
}
- 新增
RegisterPeers()
方法,将 实现了PeerPicker
接口的HTTPPool
注入到Group
中。 - 新增
getFromPeer()
方法,使用实现了PeerGetter
接口的httpGetter
从访问远程节点,获取缓存值。 - 修改
load
方法,使用PickPeer()
方法选择节点,若非本机节点,则调用getFromPeer()
从远程获取。若是本机节点或从远程节点获取失败,则回退到getLocally()
。
5 main 函数测试。
day5-multi-nodes/main.go
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}
func createGroup() *geecache.Group {
return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
}
func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
peers := geecache.NewHTTPPool(addr)
peers.Set(addrs...)
gee.RegisterPeers(peers)
log.Println("geecache is running at", addr)
log.Fatal(http.ListenAndServe(addr[7:], peers))
}
func startAPIServer(apiAddr string, gee *geecache.Group) {
http.Handle("/api", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
view, err := gee.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(view.ByteSlice())
}))
log.Println("fontend server is running at", apiAddr)
log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
}
func main() {
var port int
var api bool
flag.IntVar(&port, "port", 8001, "Geecache server port")
flag.BoolVar(&api, "api", false, "Start a api server?")
flag.Parse()
apiAddr := "http://localhost:9999"
addrMap := map[int]string{
8001: "http://localhost:8001",
8002: "http://localhost:8002",
8003: "http://localhost:8003",
}
var addrs []string
for _, v := range addrMap {
addrs = append(addrs, v)
}
gee := createGroup()
if api {
go startAPIServer(apiAddr, gee)
}
startCacheServer(addrMap[port], []string(addrs), gee)
}
main
函数的代码比较多,但是逻辑是非常简单的。
startCacheServer()
用来启动缓存服务器:创建HTTPPool
,添加节点信息,注册到gee
中,启动HTTP
服务(共3
个端口,8001/8002/8003
),用户不感知。startAPIServer()
用来启动一个API
服务(端口9999
),与用户进行交互,用户感知。main()
函数需要命令行传入port
和api
2
个参数,用来在指定端口启动HTTP
服务。
为了方便,我们将启动的命令封装为一个 shell
脚本:
#!/bin/bash
trap "rm server;kill 0" EXIT
go build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &
sleep 2
echo ">>> start test"
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
wait
trap
命令用于在 shell
脚本退出时,删掉临时文件,结束子进程。
$ ./run.sh
2020/02/16 21:17:43 geecache is running at http://localhost:8001
2020/02/16 21:17:43 geecache is running at http://localhost:8002
2020/02/16 21:17:43 geecache is running at http://localhost:8003
2020/02/16 21:17:43 fontend server is running at http://localhost:9999
>>> start test
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
...
630630630
此时,我们可以打开一个新的 shell
,进行测试:
$ curl "http://localhost:9999/api?key=Tom"
630
$ curl "http://localhost:9999/api?key=kkk"
kkk not exist
测试的时候,我们并发了 3
个请求 ?key=Tom
,从日志中可以看到,三次均选择了节点 8001
,这是一致性哈希算法的功劳。但是有一个问题在于,同时向 8001
发起了 3
次请求。试想,假如有 10
万个在并发请求该数据呢?那就会向 8001
同时发起 10
万次请求,如果 8001
又同时向数据库发起 10
万次查询请求,很容易导致缓存被击穿。
三次请求的结果是一致的,对于相同的 key
,能不能只向 8001
发起一次请求?这个问题下一次解决。
6 QA
这里有一个疑惑想问一下博主,如果要将GeeCache进行横向扩展的话,应该如何部署,可不可以将peer部署到其他机器上?
答:把 IP 和端口换成部署机器的 IP 和端口就可以了。只是测试用例中,在本机启动了三个实例,基于网络通信,部署在哪里都可以。
A :
是不是漏掉了从远程节点拿到缓存后更新本地缓存这一步?
B :
groupcache 中缓存值只淘汰不更新,也没有超时淘汰机制,这样取舍简化了设计。
A :
我的意思是当请求当前缓存服务器时 此服务器本地没有缓存 接着由当前服务器去请求其他节点服务器 当拿回来缓存值后 不应该更新此服务器的本地缓存吗 ?
B :
分布式缓存的目的是不同key缓存在不同的节点上,增加总的吞吐量。如果大家转发请求后,都再备份一次,每台机器上都缓存了相同的数据,就失去意义了。每个节点缓存1G数据,理论上10个节点总共可以缓存10G不同的数据。当然对于热点数据,每个节点拿到值后,本机备份一次是有价值的,增加热点数据的吞吐量。groupcache 的原生实现中,有1/10的概率会在本机存一次。这样10个节点,理论上可以缓存9G不同的数据,算是一种取舍。
如何才能缓存多个group?是通过go 开辟新goroutine吗?或者是给多个group注册相同的HTTPPool?
答:都可以,group 和 HTTPPool 是解耦的,可以复用,也可以各自搭配各自的。当前版本应该是不支持动态的横向扩展,现在需要把一致性hash中的物理节点提前写在配置中,或者map中,如果我想要新增一个节点怎么办呢,需要重新启动,然后利用peers.Set(addrs…)把新增后的所有节点加进来吗
答:当前版本只能这样。后续可以将缓存服务注册到注册中心,通过服务发现获取所有节点IP。可以参看带注册中心的版本:https://github.com/peanutzhen/peanutcache用户只知晓API:9999.那么API与分布式缓存是一个1对3的关系. 那么当用户查询的时候,首先查询的get是三个缓存中的哪一个呢? 还是说API本身是一个group 本地有, 没有再去三个缓存中找呢? 这个模型实在是没搞清晰 希望指点
答: api服务绑定了一个本地的geecache服务,这个服务miss的时候,API用consistent hasher 去算出应该去哪个节点找数据。获取缓存数据的流程是:从本地缓存查找->从远程节点查找->回调函数,写到本地。如果是这样的话,远程节点是不是就一直没有缓存到数据。数据要么是本地缓存直接得到,要么是本地和远程都找不到,然后回调,写到本地缓存。不知道我理解的正确吗?
答:不对。远程节点查找的时候,如果在远程节点的缓存中找不到,是调用远程节点的回调函数,存储到远程节点
原文地址: https://geektutu.com/post/geecache-day5.html