一个优秀的elasticsearch工程师对elastic官网内容和案例模板要非常清楚,因为elasticsearch的api本就复杂规律性不像sql那么简单易用。
索引操作(增-删-改)
es 有专门的 Index API,用于创建、更新、删除索引配置等
https://www.elastic.co/guide/en/elasticsearch/reference/8.1/indices.html
- 创建索引 - 
- GET _cat/indices查看现有索引
- PUT /test_index创建
 
- 删除索引 
- 更新索引 | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 
 | PUT twitter
 {
 "settings": {
 "index": {
 "number_of_shards": 3,
 "number_of_replicas": 2
 }
 }
 }
 
 PUT /twitter/_settings
 {
 "index": {
 "number_of_replicas": 2
 }
 }
 
 PUT twitter
 {
 "settings": {
 "number_of_shards": "1",
 "number_of_replicas": "1",
 
 "refresh_interval": "30s",
 
 "translog": {
 "sync_interval": "30s",
 "durability": "async"
 }
 }
 }
 
 |  
 
文档操作(增-删-改-查)
[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ocs.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ocs.html)
创建
| 12
 3
 4
 5
 
 | PUT /test_index/_doc/1{
 "username": "alfred",
 "age": 1
 }
 
 | 
创建文档时,如果 索引不存在,es 会 自动创建对应的 index
查询文档(根据id查询)
非搜索
GET /test_index/_doc/1
搜索(后续细说)
| 12
 3
 4
 5
 6
 7
 8
 
 | GET /test_index/_search{
 "query": {
 "term": {
 "_id": "1"
 }
 }
 }
 
 | 
批量操作文档
es 允许一次创建多个文档,从而减少网络传输开销,提升写入速 率
| 12
 3
 4
 5
 6
 7
 8
 
 | POST _bulk
 {"index":{"_index":"test_index","_id":"3"}}
 
 {"username":"alfred","age":10}
 {"delete":{"_index":"test_index","_id":"1"}}
 {"update":{"_id":"2","_index":"test_index"}}
 {"doc":{"age":"20"}}
 
 | 
索引别名别名(alias)
- 可以指向多个索引的软链
- 简单而重要的功能,为高级功能,如 rollover、ILM 等提供了 实现基础
- 对使用者屏蔽真实索引,降低心智负担
https://www.elastic.co/guide/en/elasticsearch/reference/8.1/indices-aliases.htm
读写分离数据
为避免数据量过大,我们会分为两个索引。新的数据将只会写道新索引nginx-logs-2023-01-02中去,查询则可以往两边查询。
graph TB
A[nginx-logs]--is_write_index:false-->B[nginx-logs-2021-01-01]
A--is_write_index:true-->C[nginx-logs-2023-01-02]
示例
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 
 | PUT /nginx-logs-2023-01-01
 {
 "mappings": {
 "properties": {
 ......
 }
 },
 "aliases": {
 "nginx_logs": {}
 }
 }
 
 POST /_aliases
 {
 "actions": [
 {
 "add": {
 "index": "nginx-logs-2023-01-02",
 "alias": "nginx_logs",
 "is_write_index": true
 }
 },
 {
 "add": {
 "index": "nginx-logs-2023-01-01",
 "alias": "nginx_logs",
 "is_write_index": false
 }
 }
 ]
 }
 
 | 
过滤条件数据
别名可以提供filter过滤响应数据
示例
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 
 | PUT /nginx-logs-2023-01-01{
 "mappings": {
 "properties": {
 "method": {
 "type": "keyword"
 }
 }
 },
 "aliases": {
 
 "nginx-logs": {},
 
 "nginx-logs-POST": {
 
 "filter": {
 "term": {
 "method": "POST"
 }
 }
 }
 }
 }
 
 | 
Index Template
[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/in dex-templates.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/in dex-templates.html)
- 主要用于在新建索引时自动应用预先设定的配置,简化索引创 建的操作步骤 
- 可以设定索引的配置和 mapping 
- 当匹配多个模板时,只采用 priority 最大的模板
索引模板 API,endpoint 为 _index_template,如下所示
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 
 | PUT _index_template/test_template{
 
 "index_patterns": [
 "te*",
 "bar*"
 ],
 "priority": 500,
 "template": {
 "settings": {
 "number_of_shards": 1
 },
 "mappings": {
 ......
 }
 }
 }
 PUT test_index
 GET test_index
 
 | 
Component Template
- 组件模板,英文为 Component Template 
- 类似积木,可以将相同的配置快速添加到不同的索引模板中, 提升维护的效率
- 模板的模板
 
创建模板
组件模板API为:PUT /_component_template/name
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 
 | PUT /_component_template/ct1
 {
 "template": {
 "settings": {
 "index.number_of_shards": 2
 }
 }
 }
 
 PUT /_component_template/ct2
 {
 "template": {
 "settings": {
 "index.number_of_replicas": 0
 },
 "mappings": {
 "properties": {
 "@timestamp": {
 "type": "date"
 }
 }
 }
 }
 }
 
 
 POST /_index_template/template1
 POST /_index_template/template1
 {
 "index_patterns": [
 "te*"
 ],
 "template": {
 "settings": {
 
 "index.number_of_shards": 3
 }
 },
 "composed_of": [
 "ct1",
 "ct2"
 ]
 }
 
 
 POST /_index_template/template2
 {
 "index_patterns": [
 "bark*"
 ],
 "template": {
 "settings": {
 "index.number_of_shards": 3
 }
 },
 "composed_of": [
 "ct2"
 ]
 }
 
 
 | 
查看模板(测试模板)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | POST /_index_template/_simulate_index/my-index-000001
 POST /_index_template/_simulate/template_1
 POST /_index_template/_simulate
 {
 "index_patterns": [
 "my*"
 ],
 "template": {
 "settings": {
 "index.number_of_shards": 3
 }
 },
 "composed_of": [
 "ct1",
 "ct2"
 ]
 }
 
 | 
Dynamic Template(动态模版)
[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ynamic-templates.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ynamic-templates.html)
es 可以自动识别文档字段类型,从而降低用户使用成本,如下所示:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 
 | PUT /test_index/doc/1
 {
 "username":"alfred",
 "age":1
 }
 
 GET /test_index/_mapping
 {
 "test_index": {
 "mappings": {
 "properties": {
 "age": {
 "type": "long"
 },
 "username": {
 "type": "text",
 "fields": {
 "keyword": {
 "type": "keyword",
 "ignore_above": 256
 }
 }
 }
 }
 }
 }
 }
 
 | 
默认动态模板
| JSON data type | Elasticsearch data type | 
| null | 忽略 | 
| double | float | 
| long | long | 
| 可识别为日期的 string | date | 
| 可识别为数字的 string | float or long | 
| 其他 string | text with a .keyword sub-field | 
| array | 选择第一个非 null 元素类型 | 
| object | object | 
|  |  | 
|  |  | 
还是有规律的
自定义动态模板
假设:
允许根据 es 自动识别的数据类型、字段名等来动态设定字段类型 ,可以实现如下效果:
- 所有字符串类型都设定为 keyword 类型,即默认不分词
- 所有以 message 开头的字段都设定为 text 类型,即分词 
- 所有以 int_ 开头的字段都设定为 int 类型
示例:(注意和index template区分)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 
 | 
 PUT _index_template/my_custom_template
 {
 "index_patterns": ["*"],
 "template": {
 "mappings": {
 "dynamic_templates": [
 
 {
 "strings_as_keywords": {
 "match_mapping_type": "string",
 "mapping": {
 "type": "keyword"
 }
 }
 },
 
 {
 "message_text_fields": {
 "match": "message*",
 "mapping": {
 "type": "text",
 "fields": {
 "keyword": {
 "type": "keyword",
 "ignore_above": 256
 }
 }
 }
 }
 },
 
 {
 "int_prefix_fields": {
 "match": "int_*",
 "mapping": {
 "type": "integer"
 }
 }
 }
 ]
 }
 }
 }
 
 PUT test_index
 {
 "mappings": {
 "dynamic_templates": [
 {
 "strings": {
 "match_mapping_type": "string",
 "mapping": {
 "type": "keyword"
 }
 }
 }
 ]
 }
 }
 
 | 
一些匹配参数
- match_mapping_type 匹配 es 自动识别的字段类型,如 boolean,long,string等 
- match,unmatch 匹配字段名
- path_match,path_unmatch 匹配路径
数据重做
- 指重建所有数据的过程,一般发生在如下情况: 
- mapping 设置变更,比如字段类型变化、分词器字典更新等 
- index 设置变更,比如分片数更改等 ◆迁移数据
 
- ES 提供了现成的 API 用于完成该工作 
Reindex - _update_by_query
- api:POST blog_index/_update_by_query?conflicts=proceed
- blog_index:文档名称,对索引中的文档原地重建
- confilicts=proceed:遇到版本冲突覆盖执行
 
- 一些额外操作如下
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | POST blog_index/_update_by_query{
 
 "script": {
 
 "source": "ctx._source.likes++",
 "lang": "painless"
 },
 
 "query": {
 "term": {
 "user": "tom"
 }
 }
 }
 
 | 
Reindex - _reindex
从A索引迁移到B索引
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 
 | POST _reindex
 {
 "source": {
 "index": "blog_index"
 },
 "dest": {
 "index": "blog_new_index"
 }
 }
 
 POST _reindex
 {
 
 "conflicts": "proceed",
 "source": {
 "index": "blog_index",
 
 "query": {
 "term": {
 "user": "tom"
 }
 }
 },
 "dest": {
 "index": "blog_new_index"
 }
 }
 
 | 
Reindex异步重建
- 数据重建的时间受源索引文档数量的影响,当数量越多时,所需时间 越长 
- 此时可以通过设定 url 参数 wait_for_completion 为 false 来异步执行 ,ES 以 task 来描述此类执行任务 
- POST blog_index/_update_by_query?conflicts=proceed&wait_for_completion=false
 
- ES 提供了 Task API 来查看任务的执行进度和相关数据
Ingest Pipeline(数据管道)
https://www.elastic.co/guide/en/elasticsearch/reference/8.1/ingest.html
Ingest Node 是 ES 的预处理角色的节点 
- 功能上类似 logstash 的 filter,提供 grok、add field、drop field 等 ETL 的能力 
- 是在数据落入 ES 数据节点前的最后一道处理流程
- 不能像 logstash 一般可以自由定制 input 和 output,相当于只 有 logstash filter 的能力,input 和 output 都是 es 自身
使用场景举例 
- 日志数据希望通过 grok 提取部分信息到专用字段里,如 ip statu_code 等 
- 线上索引数据某个字段名设置错了,希望修复为正确的字段名 
- 希望增加一个 ingest_timestamp 记录数据进入 es 的时间
Ingest Pipeline 是定义一个处理管道,如下所示:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 
 | PUT _ingest/pipeline/my-pipeline-id {
 "description": "describe pipeline",
 "processors": [
 {
 "set": {
 "field": "foo",
 "value": "bar"
 }
 }
 ]
 }
 
 | 
注意:Ingest Pipeline 的使用时机是在写入文档的时候:
使用场景
| 12
 3
 
 | PUT test_pipeline/_doc/1?pipeline=my-pipeline-id 
 POST /_bulk?pipeline=my-pipeline-id
 
 | 
索引重建时也能使用
| 12
 3
 4
 5
 6
 7
 8
 
 | PUT /my_index/_settings {
 "index": {
 "default_pipeline": "my-pipeline-id"
 }
 }
 
 POST my_index/_update_by_query?pipeline=mypipeline-id
 
 | 
调试pipeline
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 
 | POST _ingest/pipeline/my_pipeline/_simulate
 {
 "docs": [
 {
 "_index": "index",
 "_id": "id",
 "_source": {
 "foo": "bar"
 }
 }
 }
 ]
 }
 
 POST _ingest/pipeline/_simulate
 {
 "pipeline": {
 "description": "_description",
 "processors": [
 {
 "set": {
 "field": "field2",
 "value": "_value"
 }
 }
 ]
 },
 "docs": [
 {
 "_index": "index",
 "_id": "id",
 "_source": {
 "foo": "bar"
 }
 }
 ]
 }
 
 | 
Ingest Pipeline Processors
[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/inge st-processors.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/inge st-processors.html)
Ingest Pipeline 的核心组成是众多的处理器 Processors:
Date:日期处理器
负责将日期字符串转换为时间戳(date)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | {"description": "...",
 "processors": [
 {
 "date": {
 "field": "initial_date",
 "target_field": "timestamp",
 "formats": [
 "dd/MM/yyyy hh:mm:ss"
 ],
 "timezone": "Asia/Shanghai"
 }
 }
 ]
 }
 
 | 
Drop:条件过滤处理器
符合条件时丢弃该文档,不入库
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 
 | {"description": "...",
 "processors": [
 {
 "drop": {
 "if": "ctx.network_name == 'Guest'"
 }
 },
 {
 "drop": {
 "if": """
 Collection tags = ctx.tags;
 if(tags != null){
 for (String tag : tags) {
 if
 (tag.toLowerCase().contains('prod')) {
 return false;
 }
 }
 }
 return true;
 """
 }
 }
 ]
 }
 
 | 
Foreach:遍历
遍历数组字段的所有值,并做相应的处理
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | {"description": "...",
 "processors": [
 {
 "foreach": {
 "field": "values",
 "processor": {
 "uppercase": {
 "field": "_ingest._value"
 }
 }
 }
 }
 ]
 }
 
 | 
Grok
基于 Grok 从原始内容中匹配部分内容到独立的字段
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | {"description": "...",
 "processors": [
 {
 "grok": {
 "field": "message",
 "patterns": [
 "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes}%{NUMBER:duration}"
 ]
 }
 }
 ]
 }
 
 | 
JSON
将 json 字符串转成 json 对象
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 
 | {"description": "...",
 "processors": [
 {
 "json": {
 "field": "string_source",
 "target_field": "json_target"
 }
 }
 ]
 }
 
 | 
Remove
删除指定字段
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | {"description": "...",
 "processors": [
 {
 "remove": {
 "field": [
 "user_agent",
 "url"
 ]
 }
 }
 ]
 }
 
 | 
Rename
字段重命名
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 
 | {"description": "...",
 "processors": [
 {
 "rename": {
 "field": "provider",
 "target_field": "cloud.provider"
 }
 }
 ]
 }
 
 | 
Set
设置某个字段值
- 设置某字段的值,此处可以获取现有字段的值来实现拼接等该功能
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 
 | {"description": "...",
 "processors": [
 {
 "set": {
 "field": "my_field",
 "value": 582.1
 }
 },
 {
 "set": {
 "field": "host.os.name",
 "value": "{{os}}"
 }
 },
 {
 "set": {
 "field": "received",
 "value": "{{_ingest.timestamp}}"
 }
 },
 {
 "set": {
 "field": "field_c",
 "value": "{{field_a}} {{field_b}}"
 }
 },
 {
 "set": {
 "field": "_index",
 "value": "{{field_index_name}}"
 }
 },
 {
 "set": {
 "field": "{{service}}",
 "value": "{{code}}"
 }
 }
 ]
 }
 
 | 
Script
通过自定义脚本的方式来灵活更改相关字段名和字段值
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 
 | {"description": "...",
 "processors": [
 {
 "script": {
 "lang": "painless",
 "source": "ctx.field_a_plus_b_times_c = (ctx.field_a + ctx.field_b) * params.param_c",
 "params": {
 "param_c": 10
 }
 }
 }
 ]
 }
 
 | 
Split
将字符串分割为数组
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 
 | {"description": "...",
 "processors": [
 {
 "split": {
 "field": "my_field",
 "separator": "\\s+"
 }
 }
 ]
 }
 
 | 
Ingest Pipeline 异常处理
处理异常时,默认会停止并退出后续的处理流程 
- 可以通过增加异常捕捉逻辑,增加自定义的处理流程 
- 比如报错的数据写入另一个索引,或者打上另外的标签
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 
 | {"description": "my first pipeline with handled exceptions",
 "processors": [
 {
 "rename": {
 "field": "foo",
 "target_field": "bar",
 
 "on_failure": [
 {
 "set": {
 "field": "error",
 "value": "field \"foo\" does not exist, cannot rename to \"bar\""
 }
 }
 ]
 }
 }
 ]
 }
 
 
 {
 "description": "my first pipeline with handled exceptions",
 "processors": [ ... ],
 "on_failure": [
 {
 "set": {
 "field": "_index",
 "value": "failed-{{ _index }}"
 }
 }
 ]
 }
 
 |