ElasticSearch 数据存储
数据写入
Elasticsearch采用多Shard方式,通过配置routing
规则将数据分成多个数据子集,每个数据子集提供独立的索引和搜索功能,当写入文档的时候,根据routing规则,将文档发送给特定Shard中建立索引,这样就能实现分布式了,此外,Elasticsearch整体架构上采用了一主多副的方式:
每个Index由多个Shard组成(默认是1个),每个Shard有一个主节点和多个副本节点,副本个数可配。
写入过程
但每次写入的时候,写入请求会先根据_routing
规则选择发给哪个Shard,Index Request中可以设置使用哪个Filed的值作为路由参数,如果没有设置,则使用Mapping中的配置,如果mapping中也没有配置,则使用_id作为路由参数,然后通过_routing的Hash值选择出Shard(在OperationRouting类中),最后从集群的Meta中找出该Shard的Primary节点。
请求接着会发送给Primary Shard,在Primary Shard上执行成功后,再从Primary Shard上将请求同时发送给多个Replica Shard,请求在多个Replica Shard上执行成功并返回给Primary Shard后,写入请求执行成功,返回结果给客户端。
在写入时,我们可以在Request自己指定_routing,也可以在Mapping指定文档中的Field值作为_routing,如果没有指定_routing,则会把_id作为_routing进行计算。
由于写入时,具有相同_routing的文档一定会分配在同一个分片上,所以如果是自定义的_routing,在查询时,一定要指定_routing进行查询,否则是查询不到文档的,这并不是局限性,恰恰相反,指定_routing的查询,性能上会好很多,因为指定_routing意味着直接去存储数据的shard上搜索,而不会搜索所有shard。
写入原理
我们可以采用多个副本后,避免了单机或磁盘故障发生时,但是Elasticsearch里为了减少磁盘IO保证读写性能,一般是每隔一段时间(比如30分钟)才会把Lucene的Segment写入磁盘持久化,对于写入内存,但还未Flush到磁盘的Lucene数据,如果发生机器宕机或者掉电,那么内存中的数据也会丢失,这时候如何保证?ES这里采用了预写日志的机制,在ES中预写日志是translog
什么是translog
仔细分析下上面ES的refresh和flush,就会发现如果在数据还没有被flush之前,机器宕掉了,那上次flush之后到宕机前的数据就丢了
当ES异常恢复时会丢掉最后一次flush之后的数据(如果有的话),这对于绝大多数业务是不能接受的,所以ES引入了translog来解决这个问题,其实也不算什么新技术,就是类似于传统DB里面的预写日志(WAL),不过在ES里面叫事务日志(transaction log),简称translog。
这样ES的写入的时候,先在内存buffer中进行Lucene documents
的写入,写入成功后再写translog,内存buffer中的Lucene documents
经过refresh会形成file system cache
中的segment,此时内容就可以被搜索到了,然后经过flush,持久化到磁盘上面。
写入过程
在每一个Shard中,写入流程分为两部分,先写入Lucene,再写入TransLog
写入请求到达Shard后,先写Lucene文件,创建好索引,此时索引还在内存里面,接着去写TransLog,写完TransLog后,刷新TransLog数据到磁盘上,写磁盘成功后,请求返回给用户,这里有几个关键点:
和数据库不同,数据库是先写redo log,然后再写内存,而Elasticsearch是先写内存,最后才写TransLog,一种可能的原因是Lucene的内存写入会有很复杂的逻辑,很容易失败,比如分词,字段长度超过限制等,比较重,为了避免TransLog中有大量无效记录,减少recover的复杂度和提高速度,所以就把写Lucene放在了最前面。
写Lucene内存后,并不是可被搜索的,需要通过Refresh把内存的对象转成完整的Segment后,然后再次reopen后才能被搜索,一般这个时间设置为1秒钟,导致写入Elasticsearch的文档,最快要1秒钟才可被从搜索到,所以Elasticsearch在搜索方面是NRT(Near Real Time)近实时的系统。
当Elasticsearch作为NoSQL数据库时,查询方式是GetById,这种查询可以直接从TransLog中查询,这时候就成了RT(Real Time)实时系统。
每隔一段比较长的时间,比如30分钟后,Lucene会把内存中生成的新Segment刷新到磁盘上,刷新后索引文件已经持久化了,历史的TransLog就没用了,会清空掉旧的TransLog。
Lucene缓存中的数据默认1秒之后才生成segment文件,即使是生成了segment文件,这个segment是写到页面缓存中的,并不是实时的写到磁盘,只有达到一定时间或者达到一定的量才会强制flush磁盘。
如果这期间机器宕掉,内存中的数据就丢了,如果发生这种情况,内存中的数据是可以从TransLog中进行恢复的,TransLog默认是每5秒都会刷新一次磁盘,但这依然不能保证数据安全,因为仍然有可能最多丢失TransLog中5秒的数据。
这里可以通过配置增加TransLog刷磁盘的频率来增加数据可靠性,最小可配置100ms,但不建议这么做,因为这会对性能有非常大的影响,一般情况下,Elasticsearch是通过副本机制来解决这一问题的。
写入过程分析
追加事务日志
当一个文档被索引,它被加入到内存缓存,同时加到事务日志,不断有新的文档被写入到内存,同时也都会记录到事务日志中,这时新数据还不能被检索和查询。
刷新缓存
当达到默认的刷新时间或内存中的数据达到一定量后,会触发一次 refresh:
- 这些在内存缓冲区的文档被写入到一个新的段中,且没有进行fsync操作。
- 这个段被打开,使其可被搜索。
- 内存缓冲区被清空。
继续追加事务日志
这个进程继续工作,更多的文档被添加到内存缓冲区和追加到事务日志。
持久化
随着新文档索引不断被写入,当日志数据大小超过 512M 或者时间超过 30 分钟时,会进行一次全提交:
- 内存缓存区的所有文档会写入到新段中,同时清除缓存;
- 文件系统缓存通过
fsync
操作flush
到硬盘,生成提交点; - 事务日志文件被删除,创建一个空的新日志。
副本可靠性
即使主分片所在节点宕机,丢失了5秒数据,依然是可以通过副本来进行恢复的。
引入translog之后,解决了进程突然挂掉或者机器突然宕机导致还处于内存,没有被持久化到磁盘的数据丢失的问题,但数据仅落到磁盘还是无法完全保证数据的安全,比如磁盘损坏等
分布式领域解决这个问题最直观和最简单的方式就是采用副本机制,比如HDFS、Kafka等都是此类代表,ES也使用了副本的机制,一个索引由一个primary和n(n≥0)个replica组成,replica的个数支持可以通过接口动态调整,为了可靠,primary和replica的shard不能在同一台机器上面。
在数据写入的时候,数据会先写primary,写成功之后,同时分发给多个replica,待所有replica都返回成功之后,再返回给客户端,所以一个写请求的耗时可以粗略认为是写primary的时间+耗时最长的那个replica的时间。
读写一致性
数据写入
- ES会将文档发送给协调节点,根据document数据路由到指定的节点,该节点包含该
primary shard
- 把文档存储写入到
primary shard
,如果设置了index.write.wait_for_active_shards=1
,那么写完主节点,直接返回客户端,如果index.write.wait_for_active_shards=all
,那么必须要把所有的副本写入完成才返回客户端 - 如果
index.write.wait_for_active_shards=1
,那么es会异步的把主分片的数据同步到副本分片上去。(在此期间,可能会出现读请求可能读取不到最新数据的情况)
数据读取
- 客户端发送请求发送到任意一个节点,该节点成为协调节点
- 协调节点根据请求的查询的条件找到文档对应的主分片和副本节点的地址
- 随机选择一个节点,一般是轮询,可能查询主节点,也可能查询的是副本节点,然后将数据返回给协调节点
- 协调节点将数据返回给客户端
- 由于可能存在primary shard的数据还没同步到 replica shard上的情况,所以客户端可能查询到旧的数据,我们可以做相应的调整,保证读取到最新的数据。
数据存储
分片是 Elasticsearch 最小的工作单元,一个分片其实就是一个lucene索引,众多的分片组合在一起是一个完整的elasticsearch索引
数据存储原理
倒排索引不可变
倒排索引被写入磁盘后是
不可改变
的:它永远不会修改。
不可变优点
写入磁盘的倒排索引是不可变的,优势主要表现在
- 不需要锁,因为如果从来不需要更新一个索引,就不必担心多个程序同时尝试修改,也就不需要锁。
- 一旦索引被读入内核的文件系统缓存,便会留在哪里,由于其不变性,只要文件系统缓存中还有足够的空间,那么大部分读请求会直接请求内存,而不会命中磁盘,这提供了很大的性能提升。
- 写入单个大的倒排索引,可以压缩数据,只需要较少磁盘 IO 和缓存索引的内存即可。
不可变缺点
当然,不可变的索引有它的缺点:
当对旧数据进行删除时,旧数据不会马上被删除,而是在
.del
文件中被标记为删除,而旧数据只能等到段更新时才能被移除,这样会造成大量的空间浪费。若有一条数据频繁的更新,每次更新都是新增新的标记旧的,则会有大量的空间浪费。
每次新增数据时都需要新增一个段来存储数据,当段的数量太多时,对服务器的资源例如文件句柄的消耗会非常大。
在查询的结果中包含所有的结果集,需要排除被标记删除的旧数据,这增加了查询的负担。
段的引入
在全文检索的早些时候,会为整个文档集合建立一个大索引,并且写入磁盘,只有新的索引准备好了,它就会替代旧的索引,最近的修改才可以被检索,这无疑是低效的,因为上面种种原因,引入了段
新的文档首先写入内存区的索引缓存,这时不可检索。
时不时(默认 1s 一次),内存区的索引缓存被 refresh 到文件系统缓存(该过程比直接到磁盘代价低很多),成为一个新的段(segment)并被打开,这时可以被检索。
新的段提交,写入磁盘,提交后,新的段加入提交点,缓存被清除,等待接收新的文档。
什么是段
分片下的索引文件被拆分为多个子文件,每个子文件叫作段, 每一个段本身都是一个倒排索引,并且段具有不变性,一旦索引的数据被写入硬盘,就不可再修改。
段被写入到磁盘后会生成一个提交点,提交点是一个用来记录所有提交后段信息的文件,一个段一旦拥有了提交点,就说明这个段只有读的权限,失去了写的权限,相反当段在内存中时,就只有写的权限,而不具备读数据的权限,意味着不能被检索。
为什么段不可变
在 lucene 中,为了实现高索引速度,故使用了segment 分段架构存储
一批写入数据保存在一个段中,其中每个段是磁盘中的单个文件,由于两次写入之间的文件操作非常繁重,因此将一个段设为不可变的,以便所有后续写入都转到新的段。
段的合并
由于自动刷新流程每秒会创建一个新的段 ,这样会导致短时间内的段数量暴增
什么是段合并
而段数目太多会带来较大的麻烦,每一个段都会消耗文件句柄、内存和 cpu 运行周期,更重要的是,每个搜索请求都必须轮流检查每个段;所以段越多,搜索也就越慢。
Elasticsearch 通过在后台进行段合并来解决这个问题,小的段被合并到大的段,然后这些大的段再被合并到更大的段。
段合并的时候会将那些旧的已删除文档从文件系统中清除,被删除的文档(或被更新文档的旧版本)不会被拷贝到新的大段中。
段合并流程
启动段合并不需要你做任何事,进行索引和搜索时会自动进行。
- 当索引的时候,刷新(refresh)操作会创建新的段并将段打开以供搜索使用。
- 合并进程选择一小部分大小相似的段,并且在后台将它们合并到更大的段中,这并不会中断索引和搜索。
一旦合并结束,老的段被删除
- 新的段被刷新(flush)到了磁盘,写入一个包含新段且排除旧的和较小的段的新提交点。
- 新的段被打开用来搜索。
- 老的段被删除。
合并大的段需要消耗大量的 I/O 和 CPU 资源,如果任其发展会影响搜索性能,Elasticsearch在默认情况下会对合并流程进行资源限制,所以搜索仍然 有足够的资源很好地执行。
默认情况下,归并线程的限速配置 indices.store.throttle.max_bytes_per_sec 是 20MB,对于写入量较大,磁盘转速较高,甚至使用 SSD 盘的服务器来说,这个限速是明显过低的。
索引更新
如何更新索引
因为索引的不可变性带来的好处,那如何在保持不可变同时更新倒排索引?
答案是,使用多个索引,不是重写整个倒排索引,而是增加额外的索引反映最近的变化,每个倒排索引都可以按顺序查询,从最老的开始,最后把结果聚合。
更新细节
索引文件分段存储并且不可修改,那么新增、更新和删除如何处理呢?
新增,新增很好处理,由于数据是新的,所以只需要对当前文档新增一个段就可以了。
删除,由于不可修改,所以对于删除操作,不会把文档从旧的段中移除,而是通过新增一个
.del
文件(每一个提交点都有一个 .del 文件),包含了段上已经被删除的文档。当一个文档被删除,它实际上只是在.del文件中被标记为删除,依然可以匹配查询,但是最终返回之前会被从结果中删除。更新,不能修改旧的段来进行反映文档的更新,其实更新相当于是删除和新增这两个动作组成,会将旧的文档在
.del
文件中标记删除,然后文档的新版本被索引到一个新的段中,可能两个版本的文档都会被一个查询匹配到,但被删除的那个旧版本文档在结果集返回前就会被移除。
如何查询
当一个查询触发时,所有已知的段按顺序被查询。
词项统计会对所有段的结果进行聚合,以保证每个词和每个文档的关联都被准确计算,这种方式可以用相对较低的成本将新文档添加到索引。
段是不可改变的,所以既不能从把文档从旧的段中移除,也不能修改旧的段来进行反映文档的更新。 取而代之的是,每个提交点会包含一个 .del 文件,文件中会列出这些被删除文档的段信息。
当一个文档被 “删除” 时,它实际上只是在 .del 文件中被标记删除,一个被标记删除的文档仍然可以被查询匹配到, 但它会在最终结果被返回前从结果集中移除。
文档更新也是类似的操作方式:当一个文档被更新时,旧版本文档被标记删除,文档的新版本被索引到一个新的段中,可能两个版本的文档都会被一个查询匹配到,但被删除的那个旧版本文档在结果集返回前就已经被移除。
近实时搜索原理
随着按段(per-segment)搜索的发展,一个新的文档从索引到可被搜索的延迟显著降低了
直接写入存在的问题
提交一个新的段到磁盘需要
fsync
操作,确保段被物理地写入磁盘,即时电源失效也不会丢失数据
但是fsync
是昂贵的,严重影响性能,当写数据量大的时候会造成 ES 停顿卡死,查询也无法做到快速响应,新文档在几分钟之内即可被检索,但这样还是不够快,磁盘在这里成为了瓶颈,提交(Commiting)一个新的段到磁盘需要一个fsync来确保段被物理性地写入磁盘,这样在断电的时候就不会丢失数据, 但是 fsync 操作代价很大; 如果每次索引一个文档都去执行一次的话会造成很大的性能问题。
延时写策略
所以
fsync
不能在每个文档被索引的时就触发,需要一种更轻量级的方式使新的文档可以被搜索,这意味移除fsync
,为了提升写的性能,ES没有每新增一条数据就增加一个段到磁盘上,而是采用延迟写的策略。
每当有新增的数据时,就将其先写入到内存中,在内存和磁盘之间是文件系统缓存,当达到默认的时间(1秒钟)或者内存的数据达到一定量时,会触发一次刷新(Refresh),将内存中的数据生成到一个新的段上并缓存到文件缓存系统上,稍后再被刷新到磁盘中并生成提交点。
如何实现近实时搜索
这里的内存使用的是ES的JVM内存,而文件缓存系统使用的是操作系统文件缓冲区,也就是操作系统内存
新的数据会继续的被写入内存,但内存中的数据并不是以段的形式存储的,因此不能提供检索功能,由内存刷新到文件缓存系统的时候会生成了新的段,并将段打开以供搜索使用,而不需要等到被刷新到磁盘。
在 Elasticsearch 中,这种写入和打开一个新段的轻量的过程叫做 refresh (即内存刷新到文件缓存系统)默认情况下每个分片会每秒自动刷新一次,这就是为什么说 Elasticsearch 是近实时的搜索了:文档的改动不会立即被搜索,但是会在一秒内可见。
这些行为可能会对新用户造成困惑: 他们索引了一个文档然后尝试搜索它,但却没有搜到,这个问题的解决办法是用 refresh API 执行一次手动刷新: /users/_refresh。
近实时写入测试
准备工作
下面我们先创建一个索引
1 | PUT customer?pretty |
使用默认设置
写入数据
下面我们以正常的方式进行写入
1 | POST customer/_doc/1 |
查询数据
接下来我们马上进行数据的查询
1 | GET customer/_search |
我们发现数据马上就被查询出来了
关闭自动刷新
下面我们将自动刷新给关闭掉在进行测试
关闭自动刷新
1 | 关闭自动刷新 |
下面我们已经关闭了自动刷新,这个时候数据不会被刷新进段中了
写入数据
下面我们我们在关闭自动刷新的情况下写入数据
1 | post customer/_doc/2 |
文档已经被创建成功了
查询数据
接下来我们马上进行数据的查询
1 | GET customer/_search |
我们发现数据还是只有张三而没有李四,说明数据还没有被刷新过来
开启自动刷新
我们开启自动刷新后在进行查询
1 | 正常模式每一秒刷新一次 |
我们发现李四的数据已经被查询出来了
ES可靠性
相关概念
ES作为全文检索兼存储系统,数据可靠性至关重要,本文讨论ES是如何实现数据可靠性的,ES底层基于Lucene,所以有必要先搞清楚一些相关的概念。
refresh && flush && commit
Lucene中,有flush和commit的概念
所谓flush,就是定期将内存Buffer里面的数据刷新到Directory这样一个抽象的文件存储层,其实就是生成segment,需要注意的是,因为操作系统file cache的原因,这里的刷新未必会真的落盘,一般只是从内存buffer刷到了file cache而已,实质还是在内存中,所以是一个相对比较高效和轻量级的操作,flush方法的java doc是这样描述的:
1 | Moves all in-memory segments to the Directory, but does not commit (fsync) them (call commit() for that). |
形成segment以后,数据就可以被搜索了,但因为Lucene flush一般比较频繁(ES里面执行频率默认是1秒),所以会产生很多小的segment文件,一方面太多的文件会占用太多的文件描述符;
另一方面,搜索时文件太多也会影响搜索效率,所以Lucene有专门的Merge线程定期将小的segment文件merge为大文件。
Lucene的commit上面的Java doc已经提到了,它会调用fsync,commit方法的java doc如下
1 | Commits all pending changes (added and deleted documents, segment merges, added indexes, etc.) to the index, and syncs all referenced index files, such that a reader will see the changes and the index updates will survive an OS or machine crash or power loss. |
因为会调用fsync,所以commit之后,文件肯定会被持久化到磁盘上,所以这是一个重操作,一方面是磁盘的性能比较差,另一方面是commit的时候会执行更新索引等操作,commit一般是当我们认为系统到达一个稳定点的时候,commit一次,类似于流式系统里面的checkpoint,当系统出现故障的时候,Lucene会从最近的一次commit point进行恢复,而不是最近的一次flush。
总结一下,flush会生成segment,之后数据就能被搜索了,是一个轻量级操作,但此时并不保证数据被持久化了。commit是一个比较重的落盘操作,用于持久化,不能被频繁执行。
ES 概念
ES中有refresh和flush的概念,其实是和Lucene一一对应的,不过换了个名字,ES里面的refresh就是Lucene里面的flush;ES里面的flush就是Lucene里面的commit。所以,ES里面的refresh默认1秒执行一次,也就是数据写入ES之后最快1秒之后可以被搜索到,这也就是ES提供的近实时搜索NRT(Near Realtime)。而flush的执行时机有两个点:一个是ES会根据内存使用情况启发式的执行flush,另外一个时机是当translog达到512MB(默认值)时执行一次flush。网上很多文章(一般都比较早了)都提到每30分钟也会执行一次,但我在6.6版本的代码及文档里面没有找到这部分说明。
translog
仔细想一下上面介绍的Lucene的flush和commit,就会发现如果在数据还没有被commit之前,机器宕掉了,那上次commit之后到宕机前的数据就丢了。实际上,Lucene也是这么做的,异常恢复时会丢掉最后一次commit之后的数据(如果有的话)。这对于绝大多数业务是不能接受的,所以ES引入了translog来解决这个问题。其实也不算什么新技术,就是类似于传统DB里面的预写日志(WAL),不过在ES里面叫事务日志(transaction log),简称translog。
这样ES的写入的时候,先在内存buffer中进行Lucene documents的写入,写入成功后再写translog。内存buffer中的Lucene documents经过refresh会形成file system cache中的segment,此时,内容就可以被搜索到了。然后经过flush,持久化到磁盘上面。
重要问题
- 为了保证数据完全的可靠,一般的写入流程都是先写WAL,再写内存,但ES是先写内存buffer,然后再写translog。这个顺序目前没有找到官方的说明,网上大部分说的是写的过程比较复杂,容易出错,先写内存可以降低处理的复杂性。不过这个顺序个人认为对于用户而言其实不是很关键,因为不管先写谁,最终两者都写成功才会返回给客户端。
- translog的落盘(即图中的fsync过程)有两种策略,分别对应不同的可靠程度,第一种是每次请求(一个index、update、delete或者bulk操作)都会执行fsync,fsync成功后,才会给客户端返回成功,也就是请求同步刷盘,这种可靠性最高,只要返回成功,那数据一定已经落盘了,这也是默认的方式。第二种是异步的,按照定时达量的方式,默认每5秒或者512MB的时候就fsync一次。异步一般可以获得更高的吞吐量,但弊端是存在数据丢失的风险。
- ES的flush(或者Lucene的commit)也是落盘,为什么不直接用,而加一个translog?translog或者所有的WAL的一大特性就是他们都是追加写,这样大多数时候都可以实现顺序读写,读和写可以完全并发,所以性能一般都是非常好的。有一些测试表明,磁盘的顺序写甚至比内存的随机写更快,见The Pathologies of Big Data的Figure 3。
- translog不是全局的,而是每个shard(也就是Lucene的index)一个,且每个shard的所有translog中同一时刻只会有一个translog文件是可写的,其它都是只读的(如果有的话)。具体细节可查看
Translog
类的Java doc说明。 - translog的老化机制在6.0之前是segment flush到磁盘后,就删掉了。6.0之后是按定时达量的策略进行删除,默认是512MB或者12小时。
副本
引入translog之后,解决了进程突然挂掉或者机器突然宕机导致还处于内存,没有被持久化到磁盘的数据丢失的问题,但数据仅落到磁盘还是无法完全保证数据的安全,比如磁盘损坏等。分布式领域解决这个问题最直观和最简单的方式就是采用副本机制,比如HDFS、Kafka等都是此类代表,ES也使用了副本的机制。一个索引由一个primary和n(n≥0)个replica组成,replica的个数支持可以通过接口动态调整。为了可靠,primary和replica的shard不能在同一台机器上面。
这里要注意区分一下replica和shard的关系:比如创建一个索引的时候指定了5个shard,那么此时primary分片就有5个shard,每个replica也会有5个shard。我们可以通过接口随意修改replica的个数,但不能修改每个primary/replica包含5个shard这个事实。 当然,shard的个数也可以通过shrink接口进行调整,但这是一个很重的操作,而且有诸多限制,比如shard个数只能减少,且新个数是原来的因子,比如原来是8个shard,那只能选择调整为4、2或1个;如果原来是5个,那就只能调整为1个了。所以实际中,shard的个数一般要预先计划好(经验值是保证一个shard的大小在30~50GB之间),而replica的个数可以根据实际情况后面再做调整。
在数据写入的时候,数据会先写primary,写成功之后,同时分发给多个replica,待所有replica都返回成功之后,再返回给客户端。所以一个写请求的耗时可以粗略认为是写primary的时间+耗时最长的那个replica的时间。
写入优化
总体来说,ES的写入能力不算太好,所以经常需要对写入性能做优化,除了保证良好的硬件配置外,还可以从ES自身进行机制进行优化,结合上面的介绍,可以很容易得出下面的一些优化手段:
- 如果对于搜索的实时性要求不高,可以适当增加refresh的时间,比如从默认的1秒改为30秒或者1min,甚至更长。如果是离线导入再搜索的场景,可以直接设置为”-1”,即关闭自动的refresh,等导入完成后,通过接口手动refresh。其提高性能的原理是增加refresh的时间可以减少大量小的segment文件,这样在可以提高flush的效率,减小merge的压力。
- 如果对于数据可靠性要求不是特别高,可以将translog的落盘机制由默认的请求同步落盘,改为定时达量的异步落盘,提高落盘的效率。
- 如果对于数据可靠性要求不是特别高,可以在写入高峰期先不设置副本,待过了高峰之后再通过接口增加副本。这个可以通过ES的ILM策略,实现自动化。