一致性hash算法
背景
如何分配请求?
大多数网站背后肯定不是只有一台服务器提供服务,因为单机的并发量和数据量都是有限的,所以都会用多台服务器构成集群来对外提供服务。
但是问题来了,现在有那么多个节点(后面统称服务器为节点,因为少一个字),要如何分配客户端的请求呢?

其实这个问题就是「负载均衡问题」。解决负载均衡问题的算法很多,不同的负载均衡算法,对应的就是不同的分配策略,适应的业务场景也不同。
最简单的方式,引入一个中间的负载均衡层,让它将外界的请求「轮流」的转发给内部的集群。比如集群有 3 个节点,外界请求有 3 个,那么每个节点都会处理 1 个请求,达到了分配请求的目的。

考虑到每个节点的硬件配置有所区别,我们可以引入权重值,将硬件配置更好的节点的权重值设高,然后根据各个节点的权重值,按照一定比重分配在不同的节点上,让硬件配置更好的节点承担更多的请求,这种算法叫做加权轮询。
加权轮询算法使用场景是建立在每个节点存储的数据都是相同的前提。所以,每次读数据的请求,访问任意一个节点都能得到结果。
但是,加权轮询算法是无法应对「分布式系统」的,因为分布式系统中,每个节点存储的数据是不同的。
当我们想提高系统的容量,就会将数据水平切分到不同的节点来存储,也就是将数据分布到了不同的节点。比如一个分布式 KV(key-valu) 缓存系统,某个 key 应该到哪个或者哪些节点上获得,应该是确定的,不是说任意访问一个节点都可以得到缓存结果的。
因此,我们要想一个能应对分布式系统的负载均衡算法。
使用哈希算法有什么问题?
有的同学可能很快就想到了:哈希算法。因为对同一个关键字进行哈希计算,每次计算都是相同的值,这样就可以将某个 key 确定到一个节点了,可以满足分布式系统的负载均衡需求。
哈希算法最简单的做法就是进行取模运算,比如分布式系统中有 3 个节点,基于 hash(key) % 3 公式对数据进行了映射。
如果客户端要获取指定 key 的数据,通过下面的公式可以定位节点:
hash(key) % 3如果经过上面这个公式计算后得到的值是 0,就说明该 key 需要去第一个节点获取。
但是有一个很致命的问题,如果节点数量发生了变化,也就是在对系统做扩容或者缩容时,必须迁移改变了映射关系的数据,否则会出现查询不到数据的问题。
举个例子,假设我们有一个由 A、B、C 三个节点组成分布式 KV 缓存系统,基于计算公式 hash(key) % 3 将数据进行了映射,每个节点存储了不同的数据:

现在有 3 个查询 key 的请求,分别查询 key-01,key-02,key-03 的数据,这三个 key 分别经过 hash() 函数计算后的值为 hash( key-01) = 6、hash( key-02) = 7、hash(key-03) = 8,然后再对这些值进行取模运算。
通过这样的哈希算法,每个 key 都可以定位到对应的节点。

当 3 个节点不能满足业务需求了,这时我们增加了一个节点,节点的数量从 3 变化为 4,意味取模哈希函数中基数的变化,这样会导致大部分映射关系改变,如下图:

比如,之前的 hash(key-01) % 3 = 0,就变成了 hash(key-01) % 4 = 2,查询 key-01 数据时,寻址到了节点 C,而 key-01 的数据是存储在节点 A 上的,不是在节点 C,所以会查询不到数据。
同样的道理,如果我们对分布式系统进行缩容,比如移除一个节点,也会因为取模哈希函数中基数的变化,可能出现查询不到数据的问题。
要解决这个问题的办法,就需要我们进行迁移数据,比如节点的数量从 3 变化为 4 时,要基于新的计算公式 hash(key) % 4 ,重新对数据和节点做映射。
假设总数据条数为 M,哈希算法在面对节点数量变化时,最坏情况下所有数据都需要迁移,所以它的数据迁移规模是 O(M),这样数据的迁移成本太高了。
所以,传统的按服务器节点数量取模在集群扩容和收缩时存在一定的局限性。而一致性哈希算法正好解决了简单哈希算法在分布式集群中存在的动态伸缩的问题,降低节点上下线的过程中带来的数据迁移成本。
一致性哈希算法的简介
什么是一致性哈希算法
一致性哈希(Consistent Hash)算法是1997年提出,也用了取模运算,但与哈希算法不同的是,哈希算法是对节点的数量进行取模运算,而一致哈希算法是对 2^32 进行取模运算,是一个固定的值。
目的是解决分布式系统的数据分区问题:当分布式集群移除或者添加一个服务器时,必须尽可能小地改变已存在的服务请求与处理请求服务器之间的映射关系。
使用场景
一致性哈希算法是分布式系统中的重要算法,使用场景也非常广泛。主要是是负载均衡、缓存数据分区等场景。
一致性哈希应该是实现负载均衡的首选算法,它的实现比较灵活,既可以在客户端实现,也可以在中间件上实现,比如日常使用较多的缓存中间件memcached 使用的路由算法用的就是一致性哈希算法。
此外,其它的应用场景还有很多:
- RPC框架Dubbo用来选择服务提供者
- 分布式关系数据库分库分表:数据与节点的映射关系
- LVS负载均衡调度器
一致性哈希算法的原理
一致性哈希算法是如何减少数据的迁移成本?
我们可以把一致哈希算法是对 2^32 进行取模运算的结果值组织成一个圆环,就像钟表一样,钟表的圆可以理解成由 60 个点组成的圆,而此处我们把这个圆想象成由 2^32 个点组成的圆,这个圆环被称为哈希环,如下图:

一致性哈希要进行两步哈希:
- 第一步:对存储节点进行哈希计算,也就是对存储节点做哈希映射,比如根据节点的 IP 地址进行哈希;
- 第二步:当对数据进行存储或访问时,对数据进行哈希映射;
所以,一致性哈希是指将「存储节点」和「数据」都映射到一个首尾相连的哈希环上。
问题来了,对「数据」进行哈希映射得到一个结果要怎么找到存储该数据的节点呢?
答案是,映射的结果值往顺时针的方向的找到第一个节点,就是存储该数据的节点。
举个例子,有 3 个节点经过哈希计算,映射到了如下图的位置:

接着,对要查询的 key-01 进行哈希计算,确定此 key-01 映射在哈希环的位置,然后从这个位置往顺时针的方向找到第一个节点,就是存储该 key-01 数据的节点。
比如,下图中的 key-01 映射的位置,往顺时针的方向找到第一个节点就是节点 A。

所以,当需要对指定 key 的值进行读写的时候,要通过下面 2 步进行寻址:
- 首先,对 key 进行哈希计算,确定此 key 在环上的位置;
- 然后,从这个位置沿着顺时针方向走,遇到的第一节点就是存储 key 的节点。
知道了一致哈希寻址的方式,我们来看看,如果增加一个节点或者减少一个节点会发生大量的数据迁移吗?
假设节点数量从 3 增加到了 4,新的节点 D 经过哈希计算后映射到了下图中的位置:

你可以看到,key-01、key-03 都不受影响,只有 key-02 需要被迁移节点 D。
假设节点数量从 3 减少到了 2,比如将节点 A 移除:

你可以看到,key-02 和 key-03 不会受到影响,只有 key-01 需要被迁移节点 B。
因此,在一致哈希算法中,如果增加或者移除一个节点,仅影响该节点在哈希环上顺时针相邻的后继节点,其它数据也不会受到影响。
上面这些图中 3 个节点映射在哈希环还是比较分散的,所以看起来请求都会「均衡」到每个节点。
但是一致性哈希算法并不保证节点能够在哈希环上分布均匀,这样就会带来一个问题,会有大量的请求集中在一个节点上。
比如,下图中 3 个节点的映射位置都在哈希环的右半边:

这时候有一半以上的数据的寻址都会找节点 A,也就是访问请求主要集中的节点 A 上,这肯定不行的呀,说好的负载均衡呢,这种情况一点都不均衡。
另外,在这种节点分布不均匀的情况下,进行容灾与扩容时,哈希环上的相邻节点容易受到过大影响,容易发生雪崩式的连锁反应。
比如,上图中如果节点 A 被移除了,当节点 A 宕机后,根据一致性哈希算法的规则,其上数据应该全部迁移到相邻的节点 B 上,这样,节点 B 的数据量、访问量都会迅速增加很多倍,一旦新增的压力超过了节点 B 的处理能力上限,就会导致节点 B 崩溃,进而形成雪崩式的连锁反应。
所以,一致性哈希算法虽然减少了数据迁移量,但是存在节点分布不均匀的问题。
如何通过虚拟节点提高均衡度?
要想解决节点能在哈希环上分配不均匀的问题,就是要有大量的节点,节点数越多,哈希环上的节点分布的就越均匀。
但问题是,实际中我们没有那么多节点。所以这个时候我们就加入虚拟节点,也就是对一个真实节点做多个副本。
具体做法是,不再将真实节点映射到哈希环上,而是将虚拟节点映射到哈希环上,并将虚拟节点映射到实际节点,所以这里有「两层」映射关系。
比如对每个节点分别设置 3 个虚拟节点:
- 对节点 A 加上编号来作为虚拟节点:A-01、A-02、A-03
- 对节点 B 加上编号来作为虚拟节点:B-01、B-02、B-03
- 对节点 C 加上编号来作为虚拟节点:C-01、C-02、C-03
引入虚拟节点后,原本哈希环上只有 3 个节点的情况,就会变成有 9 个虚拟节点映射到哈希环上,哈希环上的节点数量多了 3 倍。

你可以看到,节点数量多了后,节点在哈希环上的分布就相对均匀了。这时候,如果有访问请求寻址到「A-01」这个虚拟节点,接着再通过「A-01」虚拟节点找到真实节点 A,这样请求就能访问到真实节点 A 了。
上面为了方便你理解,每个真实节点仅包含 3 个虚拟节点,这样能起到的均衡效果其实很有限。而在实际的工程中,虚拟节点的数量会大很多,比如 Nginx 的一致性哈希算法,每个权重为 1 的真实节点就含有160 个虚拟节点。
另外,虚拟节点除了会提高节点的均衡度,还会提高系统的稳定性。当节点变化时,会有不同的节点共同分担系统的变化,因此稳定性更高。
比如,当某个节点被移除时,对应该节点的多个虚拟节点均会移除,而这些虚拟节点按顺时针方向的下一个虚拟节点,可能会对应不同的真实节点,即这些不同的真实节点共同分担了节点变化导致的压力。
而且,有了虚拟节点后,还可以为硬件配置更好的节点增加权重,比如对权重更高的节点增加更多的虚拟机节点即可。
因此,带虚拟节点的一致性哈希方法不仅适合硬件配置不同的节点的场景,而且适合节点规模会发生变化的场景。
虚拟节点映射到哈希环上是如何分布的?
是上面提到的间隔分布还是下面的依次连续?虚拟节点是如何映射到哈希环上,使hash(key)即使集中在某一扇区,也能均匀的分散请求。

一致性哈希算法的简单实现
Node的实体类
import com.gordon.cicd.uitl.HashUtils;
import java.util.*;
public class Node {
private static final int VIRTUAL_NODE_NO_PER_NODE = 3;//设置虚拟化节点的分布
private final String ip;
private final List<Integer> virtualNodeHashes = new ArrayList<>(VIRTUAL_NODE_NO_PER_NODE);
private final Map<Object, Object> cacheMap = new HashMap<>();
public Node(String ip) {
Objects.requireNonNull(ip);
this.ip = ip;
initVirtualNodes();
}
private void initVirtualNodes() {
String virtualNodeKey;
for (int i = 1; i <= VIRTUAL_NODE_NO_PER_NODE; i++) {
virtualNodeKey = ip + "#" + i;
virtualNodeHashes.add(HashUtils.hashcode(virtualNodeKey));
}
}
public void addCacheItem(Object key, Object value) {
cacheMap.put(key, value);
}
public Object getCacheItem(Object key) {
return cacheMap.get(key);
}
public void removeCacheItem(Object key) {
cacheMap.remove(key);
}
public List<Integer> getVirtualNodeHashes() {
return virtualNodeHashes;
}
public String getIp() {
return ip;
}
}hash函数
public class HashUtils {
/**
* FNV1_32_HASH
*
* @param obj
* object
* @return hashcode
*/
public static int hashcode(Object obj) {
final int p = 16777619;
int hash = (int) 2166136261L;
String str = obj.toString();
for (int i = 0; i < str.length(); i++)
hash = (hash ^ str.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
if (hash < 0)
hash = Math.abs(hash);
//System.out.println("hash computer:" + hash);
return hash;
}
}一致性hash的实现,该示例中未实现根据权重新增虚拟节点
import com.gordon.cicd.entity.Node;
import com.gordon.cicd.uitl.HashUtils;
import java.util.*;
public class ConsistentHash {
private final TreeMap<Integer, Node> hashRing = new TreeMap<>();
public List<Node> nodeList = new ArrayList<>();
/**
* 增加节点
* 每增加一个节点,就会在闭环上增加给定虚拟节点
* 例如虚拟节点数是2,则每调用此方法一次,增加两个虚拟节点,这两个节点指向同一Node
* @param ip
*/
public void addNode(String ip) {
Objects.requireNonNull(ip);
Node node = new Node(ip);
nodeList.add(node);
for (Integer virtualNodeHash : node.getVirtualNodeHashes()) {
hashRing.put(virtualNodeHash, node);
//System.out.println("虚拟节点[" + node + "] hash:" + virtualNodeHash + ",被添加");
}
}
/**
* 移除节点
* @param node
*/
public void removeNode(Node node){
nodeList.remove(node);
}
/**
* 获取缓存数据
* 先找到对应的虚拟节点,然后映射到物理节点
* @param key
* @return
*/
public Object get(Object key) {
Node node = findMatchNode(key);
System.out.println("获取到节点:" + node.getIp());
return node.getCacheItem(key);
}
/**
* 添加缓存
* 先找到hash环上的节点,然后在对应的节点上添加数据缓存
* @param key
* @param value
*/
public void put(Object key, Object value) {
Node node = findMatchNode(key);
node.addCacheItem(key, value);
}
/**
* 删除缓存数据
*/
public void evict(Object key) {
findMatchNode(key).removeCacheItem(key);
}
/**
* 获得一个最近的顺时针节点
* @param key 为给定键取Hash,取得顺时针方向上最近的一个虚拟节点对应的实际节点
* * @return 节点对象
* @return
*/
private Node findMatchNode(Object key) {
Map.Entry<Integer, Node> entry = hashRing.ceilingEntry(HashUtils.hashcode(key));
if (entry == null) {
entry = hashRing.firstEntry();
}
return entry.getValue();
}
}测试
import com.gordon.cicd.entity.Node;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
public class ConsistentHashTest {
public static final int NODE_SIZE = 3;
public static final int STRING_COUNT = 10;
private static ConsistentHash consistentHash = new ConsistentHash();
private static List<String> sList = new ArrayList<>();
public static void main(String[] args) {
// 增加节点
for (int i = 0; i < NODE_SIZE; i++) {
String ip = new StringBuilder("10.2.1.").append(i)
.toString();
consistentHash.addNode(ip);
}
// 生成需要缓存的数据;
for (int i = 0; i < 10; i++) {
sList.add(i+"");
}
// 将数据放入到缓存中。
for (String s : sList) {
consistentHash.put(s, s);
}
// 输出节点及数据分布情况
HashMap<String, List<Object>> map = new HashMap<>();
for (int i = 0; i < sList.size(); i++) {
String key = sList.get(i);
Node matchNode = consistentHash.findMatchNode(key);
List<Object> list = map.getOrDefault(matchNode.getIp(), new ArrayList<>());
list.add(i);
map.put(matchNode.getIp(), list);
}
for (String ip : map.keySet()) {
System.out.println("ip:"+ip+"对应数据:"+map.get(ip));
}
ArrayList<Ip_Vm> ip_vms = new ArrayList<>();
for (Node node : consistentHash.nodeList){
System.out.println("ip:"+node.getIp()+"对应虚拟节点:"+node.getVirtualNodeHashes());
for (Integer virtualNodeHash : node.getVirtualNodeHashes()) {
ip_vms.add(new Ip_Vm(node.getIp(),virtualNodeHash));
}
}
ip_vms.sort((Comparator.comparingInt(Ip_Vm::getVirtualNodeHashCode)));
System.out.println("节点在hash环上的分布:");
for (Ip_Vm ip_vm : ip_vms) {
System.out.print(ip_vm.getIp()+"\t");
}
System.out.println();
System.out.println("=====================新增节点后的情况===========================");
// 新增一个数据节点
consistentHash.addNode("10.2.1.3");
// 输出节点及数据分布情况
HashMap<String, List<Object>> map_n = new HashMap<>();
for (int i = 0; i < sList.size(); i++) {
String key = sList.get(i);
Node matchNode = consistentHash.findMatchNode(key);
List<Object> list = map_n.getOrDefault(matchNode.getIp(), new ArrayList<>());
list.add(i);
map_n.put(matchNode.getIp(), list);
}
for (String ip : map_n.keySet()) {
System.out.println("ip:"+ip+"对应数据:"+map_n.get(ip));
}
ArrayList<Ip_Vm> ip_vms_n = new ArrayList<>();
for (Node node : consistentHash.nodeList){
System.out.println("ip:"+node.getIp()+"对应虚拟节点:"+node.getVirtualNodeHashes());
for (Integer virtualNodeHash : node.getVirtualNodeHashes()) {
ip_vms_n.add(new Ip_Vm(node.getIp(),virtualNodeHash));
}
}
ip_vms_n.sort((Comparator.comparingInt(Ip_Vm::getVirtualNodeHashCode)));
System.out.println("新增节点在hash环上的分布:");
for (Ip_Vm ip_vm : ip_vms_n) {
System.out.print(ip_vm.getIp()+"\t");
}
System.out.println();
}
private static class Ip_Vm{
String ip;
Integer virtualNodeHashCode;
List<Object> keys;
public List<Object> getKeys() {
return keys;
}
public void addKey(Object key) {
keys.add(key);
}
public Ip_Vm(String ip, Integer virtualNodeHashCode) {
this.ip = ip;
this.virtualNodeHashCode = virtualNodeHashCode;
keys=new ArrayList<>();
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Integer getVirtualNodeHashCode() {
return virtualNodeHashCode;
}
public void setVirtualNodeHashCode(Integer virtualNodeHashCode) {
this.virtualNodeHashCode = virtualNodeHashCode;
}
}
}
虚拟节点和数据更多情况的测试
public class ConsistentHashTest {
public static final int NODE_SIZE = 10;
public static final int STRING_COUNT = 100 * 100;
private static ConsistentHash consistentHash = new ConsistentHash();
private static List<String> sList = new ArrayList<>();
public static void main(String[] args) {
// 增加节点
for (int i = 0; i < NODE_SIZE; i++) {
String ip = new StringBuilder("10.2.1.").append(i)
.toString();
consistentHash.addNode(ip);
}
// 生成需要缓存的数据;
for (int i = 0; i < STRING_COUNT; i++) {
sList.add(RandomStringUtils.randomAlphanumeric(10));
}
// 将数据放入到缓存中。
for (String s : sList) {
consistentHash.put(s, s);
}
for(int i = 0 ; i < 10 ; i ++) {
int index = RandomUtils.nextInt(0, STRING_COUNT);
String key = sList.get(index);
String cache = (String) consistentHash.get(key);
System.out.println("Random:"+index+",key:" + key + ",consistentHash get value:" + cache +",value is:" + key.equals(cache));
}
// 输出节点及数据分布情况
for (Node node : consistentHash.nodeList){
System.out.println(node);
}
// 新增一个数据节点
consistentHash.addNode("10.2.1.110");
for(int i = 0 ; i < 10 ; i ++) {
int index = RandomUtils.nextInt(0, STRING_COUNT);
String key = sList.get(index);
String cache = (String) consistentHash.get(key);
System.out.println("Random:"+index+",key:" + key + ",consistentHash get value:" + cache +",value is:" + key.equals(cache));
}
// 输出节点及数据分布情况
for (Node node : consistentHash.nodeList){
System.out.println(node);
}
}
}