Elasticsearch document知识点。
Elasticsearch文档
{
"_index": "rrc",
"_type": "user",
"_id": "1",
"_version": 2,
"found": true,
"_source": {
"num": 1,
"tags": []
}
}
_index索引元数据
代表一个document存放在哪个索引中
索引名称必须是小写的,不能用下划线开头,不能包含逗号。
_type元类型数据
代表document属于index中的哪个类别
type名称可以是大写或者小写,但是同时不能用下划线开头,不能包含逗号
_id元数据
代表document的唯一标识,与index和type一起,可以唯一标识和定位一个document
我们可以手动指定document的id(put /index/type/id),也可以不指定,由es自动为我们创建一个id
id生成策略
手动指定document id
根据应用情况来说,是否满足手动指定document id的前提:
一般来说从某些其他的系统中导入一些数据到es,会采取这种方式,就是使用系统中已有数据的唯一标识,作为es中document的id。
PUT /rrc/user/11
{
"name":"wang",
"price":30
}
{
"_index": "rrc",
"_type": "user",
"_id": "11",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
自动生成document id
POST /rrc/user
{
"name":"wangql",
"price":30
}
{
"_index": "rrc",
"_type": "user",
"_id": "AYSyqRvxLAoa11ADX7JR",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
自动生成的id,长度为20个字符,URL安全,base64编码,GUID算法,分布式系统并行生成时不可能会发生冲突
_source元数据
在创建一个document的时候,使用的那个放在request body中的json串,默认情况下,在get的时候,会原封不动的给我们返回回来。
get /rrc/user/11
{
"_index": "rrc",
"_type": "user",
"_id": "11",
"_version": 1,
"found": true,
"_source": {
"name": "wang",
"price": 30
}
}
定制返回结果
get /rrc/user/11?_source=name
{
"_index": "rrc",
"_type": "user",
"_id": "11",
"_version": 1,
"found": true,
"_source": {
"name": "wangql"
}
}
乐观锁并发控制方案
Elasticsearch内部如何基于_version进行乐观锁并发控制
第一次创建一个document的时候,它的version内部版本号就是1;以后,每次对这个document执行修改或者删除操作,都会对这个_version版本号自动加1;哪怕是删除,也会对这条数据的版本号加1
PUT /rrc/user/4
{
"name":"tie'dan",
"price":30
}
# 首次插入version为1
{
"_index": "rrc",
"_type": "user",
"_id": "4",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
DELETE /rrc/user/4
# 执行删除操作 version为2
{
"found": true,
"_index": "rrc",
"_type": "user",
"_id": "4",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}
//
我们会发现,在删除一个document之后,可以从一个侧面证明,它不是立即物理删除掉的,因为它的一些版本号等信息还是保留着的。先删除一条document,再重新创建这条document,其实会在delete version基础之上,再把version号加1。
# 重新创建document
PUT /rrc/user/4
{
"name":"tie'dan",
"price":30
}
# 版本号增加到3
{
"_index": "rrc",
"_type": "user",
"_id": "4",
"_version": 3,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
基于_version进行乐观锁并发控制
# 构建数据
PUT /rrc/user/9
{
"name":"tie'dan",
"price":30
}
{
"_index": "rrc",
"_type": "user",
"_id": "9",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
基于version字段进行更新
post /rrc/user/9/_update?version=1
{
"doc":{
"name":"goudan"
}
}
# 查询数据 版本号变更2
GET /rrc/user/9
{
"_index": "rrc",
"_type": "user",
"_id": "9",
"_version": 2,
"found": true,
"_source": {
"name": "goudan",
"price": 30
}
}
# 如果此时依然有其他的客户端基于version=1进行修改操作
post /rrc/user/9/_update?version=1
{
"doc":{
"name":"goudan"
}
}
# 会有相应的提示信息 当前的版本号为2 更新不成功
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[user][9]: version conflict, current version [2] is different than the one provided [1]",
"index_uuid": "uUbcg5zwTxmrq6qWNjMRUw",
"shard": "1",
"index": "rrc"
}
],
"type": "version_conflict_engine_exception",
"reason": "[user][9]: version conflict, current version [2] is different than the one provided [1]",
"index_uuid": "uUbcg5zwTxmrq6qWNjMRUw",
"shard": "1",
"index": "rrc"
},
"status": 409
}
基于external version进行乐观锁并发控制
ES内部版本控制
version=1
自定义版本控制
?version=1&version_type=external
ES提供了一个feature,就是说,你可以不用它提供的内部version版本号来进行并发控制,可以基于你自己维护的一个版本号来进行并发控制。
version_type=external,唯一的区别在于,version,只有当你提供的version与es中的version一模一样的时候,才可以进行修改,只要不一样,就报错;当version_type=external的时候,只有当你提供的version比es中的_version大的时候,才能完成修改
# 获取数据 此时版本号为2
GET /rrc/user/9
{
"_index": "rrc",
"_type": "user",
"_id": "9",
"_version": 2,
"found": true,
"_source": {
"name": "goudan",
"price": 30
}
}
# 我们基于external version进行局部更新
post /rrc/user/9/_update?version=10&version_type=external
{
"doc":{
"name":"goudan"
}
}
# 根据报错可以发现external version不支持ES局部更新
{
"error": {
"root_cause": [
{
"type": "action_request_validation_exception",
"reason": "Validation Failed: 1: version type [EXTERNAL] is not supported by the update API;"
}
],
"type": "action_request_validation_exception",
"reason": "Validation Failed: 1: version type [EXTERNAL] is not supported by the update API;"
},
"status": 400
}
# 我们基于external version进行更新操作
PUT /rrc/user/9?version=10&version_type=external
{
"name":"tiedan",
"price":99
}
# 此时数据修改成功 版本号变成了10
{
"_index": "rrc",
"_type": "user",
"_id": "9",
"_version": 10,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
# 如果此时依然有其他的客户端基于version=9进行修改操作
PUT /rrc/user/9?version=9&version_type=external
{
"name":"tiedan",
"price":99
}
# 此时报版本冲突 当前版本号为10
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[user][9]: version conflict, current version [10] is higher or equal to the one provided [9]",
"index_uuid": "uUbcg5zwTxmrq6qWNjMRUw",
"shard": "1",
"index": "rrc"
}
],
"type": "version_conflict_engine_exception",
"reason": "[user][9]: version conflict, current version [10] is higher or equal to the one provided [9]",
"index_uuid": "uUbcg5zwTxmrq6qWNjMRUw",
"shard": "1",
"index": "rrc"
},
"status": 409
}
部分更新partial update
# 部分更新
POST /rrc/user/9/_update
{
"doc": {
"name": "test2"
}
}
部分更新内置乐观锁并发控制
可以通过添加retry_on_conflict
参数来控制重试次数。
基于groovy脚本进行partial update
es,其实是有个内置的脚本支持的,可以基于groovy脚本实现各种各样的复杂操作
基于groovy脚本执行partial update
内置脚本更新
GET /rrc/user/9
{
"_index": "rrc",
"_type": "user",
"_id": "9",
"_version": 10,
"found": true,
"_source": {
"name": "tiedan",
"price": 99
}
}
# 内置脚本更新
POST /rrc/user/9/_update
{
"script": "ctx._source.price += 1"
}
# 查询document
GET /rrc/user/9
{
"_index": "rrc",
"_type": "user",
"_id": "9",
"_version": 11,
"found": true,
"_source": {
"name": "tiedan",
"price": 100
}
}
外部脚本更新
外部脚本位置:
elasticsearch安装路径/config/scripts
脚本命名:test-add-tags.groovy,内容如下:
ctx._source.price+=new_price
脚本调用
POST /rrc/user/9/_update
{
"script": {
"lang": "groovy",
"file": "test-add-tags",
"params": {
"new_price": 99
}
}
}
# 调用结果
{
"_index": "rrc",
"_type": "user",
"_id": "9",
"_version": 28,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}
用脚本删除
ctx.op = ctx._source.num == count ? 'delete' : 'none'
POST /rrc/user/9/_update
{
"script": {
"lang": "groovy",
"file": "test-delete-document",
"params": {
"count": 1
}
}
}
upsert操作
如果指定的document不存在,就执行upsert中的初始化操作;如果指定的document存在,就执行doc或者script指定的partial update操作
# document不存在
POST /rrc/user/999/_update
{
"script" : "ctx._source.num+=1",
"upsert": {
"num": 0,
"tags": []
}
}
# document不存在 执行初始化操作
{
"_index": "rrc",
"_type": "user",
"_id": "999",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}
# 再次执行 此时document已经存在
POST /rrc/user/999/_update
{
"script" : "ctx._source.num+=1",
"upsert": {
"num": 0,
"tags": []
}
}
# 此时执行的是update操作
{
"_index": "rrc",
"_type": "user",
"_id": "999",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}
批量查询mget
相对于一条一条的数据查询,如果我们要查询100条数据,那么就要发送100次网络请求。但是如果我们使用批量查询,查询100条数据,就只需要发送1次网络请求,网络请求的性能开销缩减100倍。
# 查询不同index下的数据
GET /_mget
{
"docs" : [
{
"_index" : "rrc",
"_type" : "user",
"_id" : 1
},
{
"_index" : "rrc",
"_type" : "user",
"_id" : 999
}
]
}
{
"docs": [
{
"_index": "rrc",
"_type": "user",
"_id": "1",
"_version": 2,
"found": true,
"_source": {
"num": 1,
"tags": []
}
},
{
"_index": "rrc",
"_type": "user",
"_id": "999",
"_version": 2,
"found": true,
"_source": {
"num": 1,
"tags": []
}
}
]
}
# 查询同一个index下的数据
GET /rrc/_mget
{
"docs" : [
{
"_type" : "user",
"_id" : 1
},
{
"_type" : "user",
"_id" : 999
}
]
}
{
"docs": [
{
"_index": "rrc",
"_type": "user",
"_id": "1",
"_version": 2,
"found": true,
"_source": {
"num": 1,
"tags": []
}
},
{
"_index": "rrc",
"_type": "user",
"_id": "999",
"_version": 2,
"found": true,
"_source": {
"num": 1,
"tags": []
}
}
]
}
# 查询同一个type下的数据
GET /rrc/user/_mget
{
"docs" : [
{
"_id" : 1
},
{
"_id" : 999
}
]
}
# 简写
GET /rrc/user/_mget
{
"ids": [1, 2]
}
一般来说,在进行查询的时候,如果一次性要查询多条数据的话,那么一定要用batch批量操作的api,尽可能减少网络开销次数,提升系统性能。
批量操作bulk
bulk
是es
提供的一种批量增删改
的操作
bulk
对JSON串
的有着严格的要求。每个JSON串不能换行
,只能放在同一行,同时,相邻的JSON串之间必须要有换行
(Linux下是\n;Window下是\r\n)。bulk的每个操作必须要一对JSON串
(delete语法除外)。
语法:
POST /_bulk
{ action: { metadata }}
{ request body }
{ action: { metadata }}
{ request body }
bulk的操作类型
create
如果文档不存在就创建,但如果文档存在就返回错误index
如果文档不存在就创建,如果文档存在就更新update
更新一个文档,如果文档不存在就返回错误delete
删除一个文档,如果要删除的文档id不存在,就返回错误
POST /_bulk
{ "index": {"_index": "rrc", "_type":"user", "_id":"1"}}
{"name": "test1", "price": 50}
{
"took": 47,
"errors": false,
"items": [
{
"index": {
"_index": "rrc",
"_type": "user",
"_id": "1",
"_version": 7,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false,
"status": 200
}
}
]
}
批量操作:
POST /_bulk
{ "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }}
{ "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }}
{ "test_field": "test12" }
{ "index": { "_index": "test_index", "_type": "test_type", "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }
{
"took": 33,
"errors": true,
"items": [
{
"delete": {
"found": false,
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 1,
"result": "not_found",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 404
}
},
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "12",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[test_type][12]: version conflict, document already exists (current version [1])",
"index_uuid": "OrzYgLkZTnCiHq6FBDSOoQ",
"shard": "1",
"index": "test_index"
}
}
},
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 17,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false,
"status": 200
}
},
{
"update": {
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"status": 404,
"error": {
"type": "document_missing_exception",
"reason": "[test_type][1]: document missing",
"index_uuid": "OrzYgLkZTnCiHq6FBDSOoQ",
"shard": "3",
"index": "test_index"
}
}
}
]
}
bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志
document数据路由原理
1个index的数据会被分配到多个shard中,1个document只会被放到其中1个primary shard中
也就是说,当我们创建document的时候,es就要决定这个document是放在这个index的哪个shard上,这个过程就称为document routing(数据路由)。
路由算法
公式:shard=hash(routing)%number_of_primary_shards
每次增删改查一个document的时候,都会带过来一个routing,默认就是这个document的id,也就说会默认会根据id来路由
举个例子,1个index有3个primary shard(P1,P2,P3),_id是1
hash(1)假如等于22,hash值对primary shard数量求余22%3=1,那这个document由es决定放在P1上。
使用默认routing或手动指定routing
默认的routing就是_id
也可以在写入document的时候指定routing,语法为
put /index/type/id?routing=user_id
通过协调节点进行增删改的内部原理
前面讲了数据路由原理,这里要讲的是document是在哪里进行路由,那么就要引出一个概念:协调节点。简单地说所有的shard都是协调节点。java客户端可以往任何一个shard发送请求,因为任何一个shard都知道每个document在哪个shard上。下面讲一下增删改的流程/内部原理:
(1) 请求会从协调节点被转发到最终的primary shard上去处理。
(2) 然后primary shard将document同步到replica shard上。
(3) 协调节点发现路由到的所有primary shard和对应的replica shard都处理完请求后,就返回响应结果给客户端。
通过协调节点进行查询的内部原理
与增删改不同的是,协调节点会把查询请求路由到涉及到的document的其中一个primary shard或replica shard上,具体会使用round-robin随机轮询算法,使读请求负载均衡。