elasticsearch文档知识点


Elasticsearch document知识点。

Elasticsearch文档

{
  "_index": "rrc",
  "_type": "user",
  "_id": "1",
  "_version": 2,
  "found": true,
  "_source": {
    "num": 1,
    "tags": []
  }
}

_index索引元数据

代表一个document存放在哪个索引中

索引名称必须是小写的,不能用下划线开头,不能包含逗号。

_type元类型数据

代表document属于index中的哪个类别
type名称可以是大写或者小写,但是同时不能用下划线开头,不能包含逗号

_id元数据

代表document的唯一标识,与index和type一起,可以唯一标识和定位一个document
我们可以手动指定document的id(put /index/type/id),也可以不指定,由es自动为我们创建一个id

id生成策略

手动指定document id

根据应用情况来说,是否满足手动指定document id的前提:

一般来说从某些其他的系统中导入一些数据到es,会采取这种方式,就是使用系统中已有数据的唯一标识,作为es中document的id。

PUT /rrc/user/11
{
  "name":"wang",
  "price":30
}

{
  "_index": "rrc",
  "_type": "user",
  "_id": "11",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

自动生成document id

POST /rrc/user
{
  "name":"wangql",
  "price":30
}

{
  "_index": "rrc",
  "_type": "user",
  "_id": "AYSyqRvxLAoa11ADX7JR",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

自动生成的id,长度为20个字符,URL安全,base64编码,GUID算法,分布式系统并行生成时不可能会发生冲突

_source元数据

在创建一个document的时候,使用的那个放在request body中的json串,默认情况下,在get的时候,会原封不动的给我们返回回来。

get /rrc/user/11
    
{
  "_index": "rrc",
  "_type": "user",
  "_id": "11",
  "_version": 1,
  "found": true,
  "_source": {
    "name": "wang",
    "price": 30
  }
}

定制返回结果

get /rrc/user/11?_source=name

{
  "_index": "rrc",
  "_type": "user",
  "_id": "11",
  "_version": 1,
  "found": true,
  "_source": {
    "name": "wangql"
  }
}

乐观锁并发控制方案

Elasticsearch内部如何基于_version进行乐观锁并发控制

第一次创建一个document的时候,它的version内部版本号就是1;以后,每次对这个document执行修改或者删除操作,都会对这个_version版本号自动加1;哪怕是删除,也会对这条数据的版本号加1

PUT /rrc/user/4
{
  "name":"tie'dan",
  "price":30
}
# 首次插入version为1
{
  "_index": "rrc",
  "_type": "user",
  "_id": "4",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

DELETE /rrc/user/4
# 执行删除操作 version为2 
{
  "found": true,
  "_index": "rrc",
  "_type": "user",
  "_id": "4",
  "_version": 2,
  "result": "deleted",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}

//

我们会发现,在删除一个document之后,可以从一个侧面证明,它不是立即物理删除掉的,因为它的一些版本号等信息还是保留着的。先删除一条document,再重新创建这条document,其实会在delete version基础之上,再把version号加1。

# 重新创建document
PUT /rrc/user/4
{
  "name":"tie'dan",
  "price":30
}
# 版本号增加到3
{
  "_index": "rrc",
  "_type": "user",
  "_id": "4",
  "_version": 3,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

基于_version进行乐观锁并发控制

# 构建数据
PUT /rrc/user/9
{
  "name":"tie'dan",
  "price":30
}

{
  "_index": "rrc",
  "_type": "user",
  "_id": "9",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

基于version字段进行更新

post /rrc/user/9/_update?version=1
{
  "doc":{
    "name":"goudan"
  }
}

# 查询数据 版本号变更2
GET /rrc/user/9
{
  "_index": "rrc",
  "_type": "user",
  "_id": "9",
  "_version": 2,
  "found": true,
  "_source": {
    "name": "goudan",
    "price": 30
  }
}

# 如果此时依然有其他的客户端基于version=1进行修改操作
post /rrc/user/9/_update?version=1
{
  "doc":{
    "name":"goudan"
  }
}
# 会有相应的提示信息 当前的版本号为2 更新不成功
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[user][9]: version conflict, current version [2] is different than the one provided [1]",
        "index_uuid": "uUbcg5zwTxmrq6qWNjMRUw",
        "shard": "1",
        "index": "rrc"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[user][9]: version conflict, current version [2] is different than the one provided [1]",
    "index_uuid": "uUbcg5zwTxmrq6qWNjMRUw",
    "shard": "1",
    "index": "rrc"
  },
  "status": 409
}

基于external version进行乐观锁并发控制

ES内部版本控制
version=1
自定义版本控制
?version=1&version_type=external

ES提供了一个feature,就是说,你可以不用它提供的内部version版本号来进行并发控制,可以基于你自己维护的一个版本号来进行并发控制。

version_type=external,唯一的区别在于,version,只有当你提供的version与es中的version一模一样的时候,才可以进行修改,只要不一样,就报错;当version_type=external的时候,只有当你提供的version比es中的_version大的时候,才能完成修改

# 获取数据 此时版本号为2
GET /rrc/user/9
    
{
  "_index": "rrc",
  "_type": "user",
  "_id": "9",
  "_version": 2,
  "found": true,
  "_source": {
    "name": "goudan",
    "price": 30
  }
}

# 我们基于external version进行局部更新
post /rrc/user/9/_update?version=10&version_type=external
{
  "doc":{
    "name":"goudan"
  }
}
# 根据报错可以发现external version不支持ES局部更新
{
  "error": {
    "root_cause": [
      {
        "type": "action_request_validation_exception",
        "reason": "Validation Failed: 1: version type [EXTERNAL] is not supported by the update API;"
      }
    ],
    "type": "action_request_validation_exception",
    "reason": "Validation Failed: 1: version type [EXTERNAL] is not supported by the update API;"
  },
  "status": 400
}

# 我们基于external version进行更新操作
PUT /rrc/user/9?version=10&version_type=external
{
  "name":"tiedan",
  "price":99
}

# 此时数据修改成功 版本号变成了10    
{
  "_index": "rrc",
  "_type": "user",
  "_id": "9",
  "_version": 10,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}
# 如果此时依然有其他的客户端基于version=9进行修改操作
PUT /rrc/user/9?version=9&version_type=external
{
  "name":"tiedan",
  "price":99
}

# 此时报版本冲突 当前版本号为10
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[user][9]: version conflict, current version [10] is higher or equal to the one provided [9]",
        "index_uuid": "uUbcg5zwTxmrq6qWNjMRUw",
        "shard": "1",
        "index": "rrc"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[user][9]: version conflict, current version [10] is higher or equal to the one provided [9]",
    "index_uuid": "uUbcg5zwTxmrq6qWNjMRUw",
    "shard": "1",
    "index": "rrc"
  },
  "status": 409
}

部分更新partial update

# 部分更新
POST /rrc/user/9/_update
{
  "doc": {
    "name": "test2"
  }
}

部分更新内置乐观锁并发控制

可以通过添加retry_on_conflict参数来控制重试次数。

基于groovy脚本进行partial update

es,其实是有个内置的脚本支持的,可以基于groovy脚本实现各种各样的复杂操作

基于groovy脚本执行partial update

内置脚本更新

GET /rrc/user/9
    
{
  "_index": "rrc",
  "_type": "user",
  "_id": "9",
  "_version": 10,
  "found": true,
  "_source": {
    "name": "tiedan",
    "price": 99
  }
}

# 内置脚本更新
POST /rrc/user/9/_update
{
  "script": "ctx._source.price += 1"
}

# 查询document
GET /rrc/user/9

{
  "_index": "rrc",
  "_type": "user",
  "_id": "9",
  "_version": 11,
  "found": true,
  "_source": {
    "name": "tiedan",
    "price": 100
  }
}

外部脚本更新

外部脚本位置:

elasticsearch安装路径/config/scripts

脚本命名:test-add-tags.groovy,内容如下:

ctx._source.price+=new_price

脚本调用

POST /rrc/user/9/_update
{
  "script": {
    "lang": "groovy", 
    "file": "test-add-tags",
    "params": {
      "new_price": 99
    }
  }
}

# 调用结果
{
  "_index": "rrc",
  "_type": "user",
  "_id": "9",
  "_version": 28,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}

用脚本删除

ctx.op = ctx._source.num == count ? 'delete' : 'none'

POST /rrc/user/9/_update
{
  "script": {
    "lang": "groovy",
    "file": "test-delete-document",
    "params": {
      "count": 1
    }
  }
}

upsert操作

如果指定的document不存在,就执行upsert中的初始化操作;如果指定的document存在,就执行doc或者script指定的partial update操作

# document不存在
POST /rrc/user/999/_update
{
   "script" : "ctx._source.num+=1",
   "upsert": {
       "num": 0,
       "tags": []
   }
}

# document不存在 执行初始化操作
{
  "_index": "rrc",
  "_type": "user",
  "_id": "999",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}

# 再次执行 此时document已经存在
POST /rrc/user/999/_update
{
   "script" : "ctx._source.num+=1",
   "upsert": {
       "num": 0,
       "tags": []
   }
}

# 此时执行的是update操作
{
  "_index": "rrc",
  "_type": "user",
  "_id": "999",
  "_version": 2,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}

批量查询mget

相对于一条一条的数据查询,如果我们要查询100条数据,那么就要发送100次网络请求。但是如果我们使用批量查询,查询100条数据,就只需要发送1次网络请求,网络请求的性能开销缩减100倍。

# 查询不同index下的数据
GET /_mget
{
  "docs" : [
      {
         "_index" : "rrc",
         "_type" :  "user",
         "_id" :    1
      },
      {
         "_index" : "rrc",
         "_type" :  "user",
         "_id" :    999
      }
   ]
}

{
  "docs": [
    {
      "_index": "rrc",
      "_type": "user",
      "_id": "1",
      "_version": 2,
      "found": true,
      "_source": {
        "num": 1,
        "tags": []
      }
    },
    {
      "_index": "rrc",
      "_type": "user",
      "_id": "999",
      "_version": 2,
      "found": true,
      "_source": {
        "num": 1,
        "tags": []
      }
    }
  ]
}

# 查询同一个index下的数据
GET /rrc/_mget
{
  "docs" : [
      {
         "_type" :  "user",
         "_id" :    1
      },
      {
         "_type" :  "user",
         "_id" :    999
      }
   ]
}

{
  "docs": [
    {
      "_index": "rrc",
      "_type": "user",
      "_id": "1",
      "_version": 2,
      "found": true,
      "_source": {
        "num": 1,
        "tags": []
      }
    },
    {
      "_index": "rrc",
      "_type": "user",
      "_id": "999",
      "_version": 2,
      "found": true,
      "_source": {
        "num": 1,
        "tags": []
      }
    }
  ]
}

# 查询同一个type下的数据
GET /rrc/user/_mget
{
  "docs" : [
      {
         "_id" :    1
      },
      {
         "_id" :    999
      }
   ]
}

# 简写
GET /rrc/user/_mget
{
   "ids": [1, 2]
}

一般来说,在进行查询的时候,如果一次性要查询多条数据的话,那么一定要用batch批量操作的api,尽可能减少网络开销次数,提升系统性能。

批量操作bulk

bulkes提供的一种批量增删改的操作

bulkJSON串的有着严格的要求。每个JSON串不能换行,只能放在同一行,同时,相邻的JSON串之间必须要有换行(Linux下是\n;Window下是\r\n)。bulk的每个操作必须要一对JSON串(delete语法除外)。

语法:

POST /_bulk
{ action: { metadata }}
{ request body        }
{ action: { metadata }}
{ request body        }

bulk的操作类型

  • create 如果文档不存在就创建,但如果文档存在就返回错误
  • index 如果文档不存在就创建,如果文档存在就更新
  • update 更新一个文档,如果文档不存在就返回错误
  • delete 删除一个文档,如果要删除的文档id不存在,就返回错误
POST /_bulk
{ "index":  {"_index": "rrc", "_type":"user", "_id":"1"}}
{"name": "test1", "price": 50}

{
  "took": 47,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "rrc",
        "_type": "user",
        "_id": "1",
        "_version": 7,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": false,
        "status": 200
      }
    }
  ]
}

批量操作:

POST /_bulk
{ "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }} 
{ "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }}
{ "test_field":    "test12" }
{ "index":  { "_index": "test_index", "_type": "test_type", "_id": "2" }}
{ "test_field":    "replaced test2" }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }


{
  "took": 33,
  "errors": true,
  "items": [
    {
      "delete": {
        "found": false,
        "_index": "test_index",
        "_type": "test_type",
        "_id": "3",
        "_version": 1,
        "result": "not_found",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 404
      }
    },
    {
      "create": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "12",
        "status": 409,
        "error": {
          "type": "version_conflict_engine_exception",
          "reason": "[test_type][12]: version conflict, document already exists (current version [1])",
          "index_uuid": "OrzYgLkZTnCiHq6FBDSOoQ",
          "shard": "1",
          "index": "test_index"
        }
      }
    },
    {
      "index": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "2",
        "_version": 17,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": false,
        "status": 200
      }
    },
    {
      "update": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "1",
        "status": 404,
        "error": {
          "type": "document_missing_exception",
          "reason": "[test_type][1]: document missing",
          "index_uuid": "OrzYgLkZTnCiHq6FBDSOoQ",
          "shard": "3",
          "index": "test_index"
        }
      }
    }
  ]
}

bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志

document数据路由原理

1个index的数据会被分配到多个shard中,1个document只会被放到其中1个primary shard中

也就是说,当我们创建document的时候,es就要决定这个document是放在这个index的哪个shard上,这个过程就称为document routing(数据路由)。

路由算法

公式:shard=hash(routing)%number_of_primary_shards

每次增删改查一个document的时候,都会带过来一个routing,默认就是这个document的id,也就说会默认会根据id来路由

举个例子,1个index有3个primary shard(P1,P2,P3),_id是1

hash(1)假如等于22,hash值对primary shard数量求余22%3=1,那这个document由es决定放在P1上。

使用默认routing或手动指定routing

默认的routing就是_id

也可以在写入document的时候指定routing,语法为

put /index/type/id?routing=user_id

通过协调节点进行增删改的内部原理

前面讲了数据路由原理,这里要讲的是document是在哪里进行路由,那么就要引出一个概念:协调节点。简单地说所有的shard都是协调节点。java客户端可以往任何一个shard发送请求,因为任何一个shard都知道每个document在哪个shard上。下面讲一下增删改的流程/内部原理:

(1) 请求会从协调节点被转发到最终的primary shard上去处理。

(2) 然后primary shard将document同步到replica shard上。

(3) 协调节点发现路由到的所有primary shard和对应的replica shard都处理完请求后,就返回响应结果给客户端。

通过协调节点进行查询的内部原理

与增删改不同的是,协调节点会把查询请求路由到涉及到的document的其中一个primary shard或replica shard上,具体会使用round-robin随机轮询算法,使读请求负载均衡。


文章作者: WangQingLei
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 WangQingLei !
  目录