Storm中的分布式缓存


原文地址:http://storm.apache.org/releases/1.1.1/distcache-blobstore.html

翻译水平有限,欢迎各位指正。

Storm 分布式缓存API

Storm中的分布式缓存的主要用途在于存储那些在topology生命周期中会产生变化且数量众多的文件(即blobs,在本文档中,blobs和“分布式缓存中文件”的意义等价),如位置数据、字典数据等。blobs的典型的应用场景包括短语识别、实体提取、文档分类、URL地址重写、定位/地址检测等。这些blobs数据的大小从几KB到几GB不等。对于那些不会动态更新的小数据集合,我们将其直接打包在topology的jar包中是可行的,但是对于那些大型的数据集合,打包再提交的启动时间将会非常大。在这个例子中,使用分布式缓存能大大提高topology的启动速度,特别要指出的是,同一个submitter提交的文件,会一直驻留在缓存中。这样的设计使得缓存中的数据可以重复利用。

在topology启动时,用户指定好哪些文件是topology所需要的。topology开始运行后,用户可以随时请求blobs并且更新其到新的版本。blobs的更新基于最终一致性模型(eventual consistency model)。如果topology想要知道其访问的文件的具体版本,这需要用户自己来实现相应信息的查询功能。缓存文件的置换使用 Least-Recently Used (LRU)算法,supervisor将基于这一算法对缓存文件进行置换。此外,blobs是可以压缩的,用户可以在使用其之前将其解压。

使用分布式缓存的动机

  • 允许在topology之间共享blobs。
  • 允许从命令行对blob进行更新。

分布式缓存的实现

目前的BlobStore包括两个实现:LocalFsBlobStore 和HdfsBlobStore。详细接口请参考附录A。

###LocalFsBlobStore

Blobstore的本地文件系统实现如上面的时间线图中所示。

blob的使用包括创建blob、下载blob以及在topology中使用blob。主要的步骤如下所示:

####创建Blob的命令

1
storm blobstore create --file README.txt --acl o::rwa --replication-factor 4 key1


上面的命令创建了一个名为“key1”的blob,对应于文件readme.txt,对所有用户的访问权限为读(r)、写(w)、管理(a)。此外,该文件包含4个拷贝。

#### topology提交及Blob映射

用户可以使用下面的命令来提交topology。该命令包括拓扑映射配置。该配置包含两个键“key1”和“key2”,其中键“key1”具有一个名为“blob_file”的没有被压缩的本地文件名映射。

1
2
storm jar /home/y/lib/storm-starter/current/storm-starter-jar-with-dependencies.jar
org.apache.storm.starter.clj.word_count test_topo -c topology.blobstore.map='{"key1":{"localname":"blob_file", "uncompress":"false"},"key2":{}}'


####Blob创建进程
Blob通过ClientBlobStore接口进行创建。附录B中包含了ClientBlobStore中的所有接口。ClientBlobStore的一个具体实现是NimbusBlobStore。在使用本地文件系统的情况下,客户端调用nimbus来创建本地文件系统中的blob。nimbus使用本地文件系统实现来创建这些blob。当用户提交一个topology,包括jar、配置文件以及代码文件都被作为blob提交到blobstore中。

####Supervisor下载blob
最终,在topology运行时,同一个NimbusBlobStore thrift客户端上传的blob,通过nimbus分配到指定的supervisor,supervisor接受到分配指令后,即会下载对应的blob。supervisor通过NimbusBlobStore client直接下载topology的jar、conf blobs等。

###HdfsBlobStore

HdfsBlobStore 的blob创建和下载过程的实现和Local file system上的实现类似,唯一的区别在于支持对blobstore的多个拷贝。实现数据的多个拷贝是HDFS的天生技能,这也使得该模式下,我们不需要把blob的状态存储都在zookeeper中。另一方面,本地文件系统的blobstore需要将状态存储在zookeeper中,以便能和nimbus HA一起协同工作。Nimbus HA允许本地文件系统无缝地实现多拷贝这一特性,将状态存储在zookeeper的运行topology数据中,并在不同的Nimbus上同步这些blob。改模式下,最终supervisor使用HdfsClientBlobStore来与HdfsBlobStore 进行通信。

##其他特性及相关文档


1
2
storm jar /home/y/lib/storm-starter/current/storm-starter-jar-with-dependencies.jar org.apache.storm.starter.clj.word_count test_topo
-c topology.blobstore.map='{"key1":{"localname":"blob_file", "uncompress":"false"},"key2":{}}'


###压缩
BlobStore允许用户将uncompress配置项指定为true或false。这个配置可以在topology.blobstore.map中指定。上面的命令表示允许用户上传压缩文件,如tar/zip。在本地文件系统blobstore中,压缩的blob存储在nimbus节点上。本地化代码负责对blob进行解压,并将其存储在supervisor节点上。在用户逻辑开始执行之前,supervisor节点上blob的符号链接(Symbolic links)将会在worker中被创建。

###本地文件名称映射

除了压缩之外,在不同的supervisor节点上,本地化器(localizer)可在将blob映射为一个本地化的名称。

###BlobStore的一些具体实现细节

BlobStore基于哈希函数创建blobs. Blob通常存储在BlobStore指定的目录中(配置项blobstore.dir)。默认配置为storm.local.dir/blobs

一旦提交了一个文件,BlobStore将读取configs,并为blob创建一个带有所有访问控制细节的元数据。在访问blob时,元数据通常用于授权。blob的哈希值基于blob数据的key和版本号生成。blob数据默认放置在storm.local.dir/blobs/data目录下。通常会生成放置在一个正数命名的目录来放置,如193,822。

一旦topology启动,storm.conf, storm.ser和storm.code相关的blobs将会首先被下载,其他的命令行上上传的所有blob都使用本地化器来解压,并将它们映射到topology.blobstore.map指定的本地名称。supervisor通过检查版本的变化定期更新blob。动态地更新blob使它成为一个非常有用的特性。

对于本地文件系统,supervisor节点上的分布式缓存被设置为1024mb(软限制),同时,根据LRU策略,将会在每600秒内清除任何超过软限制的内容。

另一方面,HDFS BlobStore的实现通过消除在nimbus上存储blob的负担来更好地处理负载,从而避免它成为一个瓶颈。此外,它还提供了blob的无缝拷贝,本地文件系统BlobStore在复制blob时效率不高,并且受到nimbus的数量的限制。此外,在没有使用nimbus存储blob的情况下,supervisor直接与HDFS BlobStore进行通信,从而减少了对nimbus的负载和依赖性。

高可用的Nimbus

问题描述

目前,nimbus是一个在单个机器上运行的进程。在大多数情况下,nimbus失败是暂时的,并且可以通过执行监督的进程来重新启动。然而,当磁盘出现故障或者网络分区出现时,nimbus就会宕机。这种情况下,已经启动的topology能够正常运行,但是无法提交新的topology,且无法对运行着的topology进行kill、deactivated或activated等操作。此时,如果supervisor节点出现故障,则不会执行topology资源重新分配,从而导致性能下降或topology故障。当前的解决方案是,启动多台nimbus来避免单点故障。

高可用Nimbus的要求

  • 增加nimbus的总体可用性。
  • 允许nimbus主机随时离开并加入集群。一个新加入的主机应该自动加入可用的nimbus名单。
  • 在nimbus失败的情况下,不需要进行拓扑重新提交。
  • 任何正在执行的topology都不应丢失。

集群领导选举(Leader Election)

nimbus包含以下接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public interface ILeaderElector {
/**
* queue up for leadership lock. The call returns immediately and the caller
* must check isLeader() to perform any leadership action.
*/
void addToLeaderLockQueue();
/**
* Removes the caller from the leader lock queue. If the caller is leader
* also releases the lock.
*/
void removeFromLeaderLockQueue();
/**
*
* @return true if the caller currently has the leader lock.
*/
boolean isLeader();
/**
*
* @return the current leader's address , throws exception if noone has has lock.
*/
InetSocketAddress getLeaderAddress();
/**
*
* @return list of current nimbus addresses, includes leader.
*/
List<InetSocketAddress> getAllNimbusAddresses();
}

一旦一个可用的nimbus出现,即调用addToLeaderLockQueue函数,将其加入可用nimbus名单。领导选举算法将从Queue中选出一个节点作为leader,若此时topology的代码、jar或者blob有丢失,则会从其他的正在运行的nimbus节点下载这些数据。

The first implementation will be Zookeeper based. If the zookeeper connection is lost/reset resulting in loss of lock or the spot in queue the implementation will take care of updating the state such that isLeader() will reflect the current status. The leader like actions must finish in less than minimumOf(connectionTimeout, SessionTimeout) to ensure the lock was held by nimbus for the entire duration of the action (Not sure if we want to just state this expectation and ensure that zk configurations are set high enough which will result in higher failover time or we actually want to create some sort of rollback mechanism for all actions, the second option needs a lot of code). If a nimbus that is not leader receives a request that only a leader can perform, it will throw a RunTimeException.

Nimbus状态存储

(未完待续)