一个优秀的elasticsearch工程师对elastic官网内容和案例模板要非常清楚,因为elasticsearch的api本就复杂规律性不像sql那么简单易用。
索引操作(增-删-改)
es 有专门的 Index API,用于创建、更新、删除索引配置等
https://www.elastic.co/guide/en/elasticsearch/reference/8.1/indices.html
创建索引
GET _cat/indices
查看现有索引
PUT /test_index
创建
删除索引
更新索引
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| PUT twitter { "settings": { "index": { "number_of_shards": 3, "number_of_replicas": 2 } } }
PUT /twitter/_settings { "index": { "number_of_replicas": 2 } }
PUT twitter { "settings": { "number_of_shards": "1", "number_of_replicas": "1", "refresh_interval": "30s", "translog": { "sync_interval": "30s", "durability": "async" } } }
|
文档操作(增-删-改-查)
[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ocs.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ocs.html)
创建
1 2 3 4 5
| PUT /test_index/_doc/1 { "username": "alfred", "age": 1 }
|
创建文档时,如果 索引不存在,es 会 自动创建对应的 index
查询文档(根据id查询)
非搜索
GET /test_index/_doc/1
搜索(后续细说)
1 2 3 4 5 6 7 8
| GET /test_index/_search { "query": { "term": { "_id": "1" } } }
|
批量操作文档
es 允许一次创建多个文档,从而减少网络传输开销,提升写入速 率
1 2 3 4 5 6 7 8
| POST _bulk
{"index":{"_index":"test_index","_id":"3"}}
{"username":"alfred","age":10} {"delete":{"_index":"test_index","_id":"1"}} {"update":{"_id":"2","_index":"test_index"}} {"doc":{"age":"20"}}
|
索引别名别名(alias)
- 可以指向多个索引的软链
- 简单而重要的功能,为高级功能,如 rollover、ILM 等提供了 实现基础
- 对使用者屏蔽真实索引,降低心智负担
https://www.elastic.co/guide/en/elasticsearch/reference/8.1/indices-aliases.htm
读写分离数据
为避免数据量过大,我们会分为两个索引。新的数据将只会写道新索引nginx-logs-2023-01-02
中去,查询则可以往两边查询。
graph TB
A[nginx-logs]--is_write_index:false-->B[nginx-logs-2021-01-01]
A--is_write_index:true-->C[nginx-logs-2023-01-02]
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| PUT /nginx-logs-2023-01-01 { "mappings": { "properties": { ...... } }, "aliases": { "nginx_logs": {} } }
POST /_aliases { "actions": [ { "add": { "index": "nginx-logs-2023-01-02", "alias": "nginx_logs", "is_write_index": true } }, { "add": { "index": "nginx-logs-2023-01-01", "alias": "nginx_logs", "is_write_index": false } } ] }
|
过滤条件数据
别名可以提供filter过滤响应数据
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| PUT /nginx-logs-2023-01-01 { "mappings": { "properties": { "method": { "type": "keyword" } } }, "aliases": { "nginx-logs": {}, "nginx-logs-POST": { "filter": { "term": { "method": "POST" } } } } }
|
Index Template
[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/in dex-templates.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/in dex-templates.html)
- 主要用于在新建索引时自动应用预先设定的配置,简化索引创 建的操作步骤
- 可以设定索引的配置和 mapping
- 当匹配多个模板时,只采用 priority 最大的模板
索引模板 API,endpoint 为 _index_template,如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| PUT _index_template/test_template { "index_patterns": [ "te*", "bar*" ], "priority": 500, "template": { "settings": { "number_of_shards": 1 }, "mappings": { ...... } } } PUT test_index GET test_index
|
Component Template
- 组件模板,英文为 Component Template
- 类似积木,可以将相同的配置快速添加到不同的索引模板中, 提升维护的效率
- 模板的模板
创建模板
组件模板API为:PUT /_component_template/name
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| PUT /_component_template/ct1 { "template": { "settings": { "index.number_of_shards": 2 } } }
PUT /_component_template/ct2 { "template": { "settings": { "index.number_of_replicas": 0 }, "mappings": { "properties": { "@timestamp": { "type": "date" } } } } }
POST /_index_template/template1 POST /_index_template/template1 { "index_patterns": [ "te*" ], "template": { "settings": { "index.number_of_shards": 3 } }, "composed_of": [ "ct1", "ct2" ] }
POST /_index_template/template2 { "index_patterns": [ "bark*" ], "template": { "settings": { "index.number_of_shards": 3 } }, "composed_of": [ "ct2" ] }
|
查看模板(测试模板)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| POST /_index_template/_simulate_index/my-index-000001 POST /_index_template/_simulate/template_1 POST /_index_template/_simulate { "index_patterns": [ "my*" ], "template": { "settings": { "index.number_of_shards": 3 } }, "composed_of": [ "ct1", "ct2" ] }
|
Dynamic Template(动态模版)
[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ynamic-templates.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ynamic-templates.html)
es 可以自动识别文档字段类型,从而降低用户使用成本,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| PUT /test_index/doc/1 { "username":"alfred", "age":1 }
GET /test_index/_mapping { "test_index": { "mappings": { "properties": { "age": { "type": "long" }, "username": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } } }
|
默认动态模板
JSON data type |
Elasticsearch data type |
null |
忽略 |
double |
float |
long |
long |
可识别为日期的 string |
date |
可识别为数字的 string |
float or long |
其他 string |
text with a .keyword sub-field |
array |
选择第一个非 null 元素类型 |
object |
object |
|
|
|
|
还是有规律的
自定义动态模板
假设:
允许根据 es 自动识别的数据类型、字段名等来动态设定字段类型 ,可以实现如下效果:
- 所有字符串类型都设定为 keyword 类型,即默认不分词
- 所有以 message 开头的字段都设定为 text 类型,即分词
- 所有以 int_ 开头的字段都设定为 int 类型
示例:(注意和index template区分)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
PUT _index_template/my_custom_template { "index_patterns": ["*"], "template": { "mappings": { "dynamic_templates": [ { "strings_as_keywords": { "match_mapping_type": "string", "mapping": { "type": "keyword" } } }, { "message_text_fields": { "match": "message*", "mapping": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }, { "int_prefix_fields": { "match": "int_*", "mapping": { "type": "integer" } } } ] } } }
PUT test_index { "mappings": { "dynamic_templates": [ { "strings": { "match_mapping_type": "string", "mapping": { "type": "keyword" } } } ] } }
|
一些匹配参数
- match_mapping_type 匹配 es 自动识别的字段类型,如 boolean,long,string等
- match,unmatch 匹配字段名
- path_match,path_unmatch 匹配路径
数据重做
- 指重建所有数据的过程,一般发生在如下情况:
- mapping 设置变更,比如字段类型变化、分词器字典更新等
- index 设置变更,比如分片数更改等 ◆迁移数据
- ES 提供了现成的 API 用于完成该工作
Reindex - _update_by_query
- api:
POST blog_index/_update_by_query?conflicts=proceed
- blog_index:文档名称,对索引中的文档原地重建
- confilicts=proceed:遇到版本冲突覆盖执行
- 一些额外操作如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| POST blog_index/_update_by_query { "script": { "source": "ctx._source.likes++", "lang": "painless" }, "query": { "term": { "user": "tom" } } }
|
Reindex - _reindex
从A索引迁移到B索引
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| POST _reindex { "source": { "index": "blog_index" }, "dest": { "index": "blog_new_index" } }
POST _reindex { "conflicts": "proceed", "source": { "index": "blog_index", "query": { "term": { "user": "tom" } } }, "dest": { "index": "blog_new_index" } }
|
Reindex异步重建
- 数据重建的时间受源索引文档数量的影响,当数量越多时,所需时间 越长
- 此时可以通过设定 url 参数 wait_for_completion 为 false 来异步执行 ,ES 以 task 来描述此类执行任务
POST blog_index/_update_by_query?conflicts=proceed&wait_for_completion=false
- ES 提供了 Task API 来查看任务的执行进度和相关数据
Ingest Pipeline(数据管道)
https://www.elastic.co/guide/en/elasticsearch/reference/8.1/ingest.html
Ingest Node 是 ES 的预处理角色的节点
- 功能上类似 logstash 的 filter,提供 grok、add field、drop field 等 ETL 的能力
- 是在数据落入 ES 数据节点前的最后一道处理流程
- 不能像 logstash 一般可以自由定制 input 和 output,相当于只 有 logstash filter 的能力,input 和 output 都是 es 自身
使用场景举例
- 日志数据希望通过 grok 提取部分信息到专用字段里,如 ip statu_code 等
- 线上索引数据某个字段名设置错了,希望修复为正确的字段名
- 希望增加一个 ingest_timestamp 记录数据进入 es 的时间
Ingest Pipeline 是定义一个处理管道,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12
| PUT _ingest/pipeline/my-pipeline-id { "description": "describe pipeline", "processors": [ { "set": { "field": "foo", "value": "bar" } } ] }
|
注意:Ingest Pipeline 的使用时机是在写入文档的时候:
使用场景
1 2 3
| PUT test_pipeline/_doc/1?pipeline=my-pipeline-id
POST /_bulk?pipeline=my-pipeline-id
|
索引重建时也能使用
1 2 3 4 5 6 7 8
| PUT /my_index/_settings { "index": { "default_pipeline": "my-pipeline-id" } }
POST my_index/_update_by_query?pipeline=mypipeline-id
|
调试pipeline
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| POST _ingest/pipeline/my_pipeline/_simulate { "docs": [ { "_index": "index", "_id": "id", "_source": { "foo": "bar" } } } ] }
POST _ingest/pipeline/_simulate { "pipeline": { "description": "_description", "processors": [ { "set": { "field": "field2", "value": "_value" } } ] }, "docs": [ { "_index": "index", "_id": "id", "_source": { "foo": "bar" } } ] }
|
Ingest Pipeline Processors
[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/inge st-processors.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/inge st-processors.html)
Ingest Pipeline 的核心组成是众多的处理器 Processors:
Date:日期处理器
负责将日期字符串转换为时间戳(date)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| { "description": "...", "processors": [ { "date": { "field": "initial_date", "target_field": "timestamp", "formats": [ "dd/MM/yyyy hh:mm:ss" ], "timezone": "Asia/Shanghai" } } ] }
|
Drop:条件过滤处理器
符合条件时丢弃该文档,不入库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| { "description": "...", "processors": [ { "drop": { "if": "ctx.network_name == 'Guest'" } }, { "drop": { "if": """ Collection tags = ctx.tags; if(tags != null){ for (String tag : tags) { if (tag.toLowerCase().contains('prod')) { return false; } } } return true; """ } } ] }
|
Foreach:遍历
遍历数组字段的所有值,并做相应的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| { "description": "...", "processors": [ { "foreach": { "field": "values", "processor": { "uppercase": { "field": "_ingest._value" } } } } ] }
|
Grok
基于 Grok 从原始内容中匹配部分内容到独立的字段
1 2 3 4 5 6 7 8 9 10 11 12 13
| { "description": "...", "processors": [ { "grok": { "field": "message", "patterns": [ "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes}%{NUMBER:duration}" ] } } ] }
|
JSON
将 json 字符串转成 json 对象
1 2 3 4 5 6 7 8 9 10 11
| { "description": "...", "processors": [ { "json": { "field": "string_source", "target_field": "json_target" } } ] }
|
Remove
删除指定字段
1 2 3 4 5 6 7 8 9 10 11 12 13
| { "description": "...", "processors": [ { "remove": { "field": [ "user_agent", "url" ] } } ] }
|
Rename
字段重命名
1 2 3 4 5 6 7 8 9 10 11
| { "description": "...", "processors": [ { "rename": { "field": "provider", "target_field": "cloud.provider" } } ] }
|
Set
设置某个字段值
- 设置某字段的值,此处可以获取现有字段的值来实现拼接等该功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| { "description": "...", "processors": [ { "set": { "field": "my_field", "value": 582.1 } }, { "set": { "field": "host.os.name", "value": "{{os}}" } }, { "set": { "field": "received", "value": "{{_ingest.timestamp}}" } }, { "set": { "field": "field_c", "value": "{{field_a}} {{field_b}}" } }, { "set": { "field": "_index", "value": "{{field_index_name}}" } }, { "set": { "field": "{{service}}", "value": "{{code}}" } } ] }
|
Script
通过自定义脚本的方式来灵活更改相关字段名和字段值
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| { "description": "...", "processors": [ { "script": { "lang": "painless", "source": "ctx.field_a_plus_b_times_c = (ctx.field_a + ctx.field_b) * params.param_c", "params": { "param_c": 10 } } } ] }
|
Split
将字符串分割为数组
1 2 3 4 5 6 7 8 9 10 11
| { "description": "...", "processors": [ { "split": { "field": "my_field", "separator": "\\s+" } } ] }
|
Ingest Pipeline 异常处理
处理异常时,默认会停止并退出后续的处理流程
- 可以通过增加异常捕捉逻辑,增加自定义的处理流程
- 比如报错的数据写入另一个索引,或者打上另外的标签
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| { "description": "my first pipeline with handled exceptions", "processors": [ { "rename": { "field": "foo", "target_field": "bar", "on_failure": [ { "set": { "field": "error", "value": "field \"foo\" does not exist, cannot rename to \"bar\"" } } ] } } ] }
{ "description": "my first pipeline with handled exceptions", "processors": [ ... ], "on_failure": [ { "set": { "field": "_index", "value": "failed-{{ _index }}" } } ] }
|