Elasticsearch搜索引擎(数据操作)

一个优秀的elasticsearch工程师对elastic官网内容和案例模板要非常清楚,因为elasticsearch的api本就复杂规律性不像sql那么简单易用。

索引操作(增-删-改)

es 有专门的 Index API,用于创建、更新、删除索引配置等

https://www.elastic.co/guide/en/elasticsearch/reference/8.1/indices.html

  • 创建索引

    • GET _cat/indices 查看现有索引
    • PUT /test_index 创建
      • 索引配置可以不着急更新(后续update更新)
  • 删除索引

    • DELETE /test_index
  • 更新索引

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    //更新索引配置,这里选择了索引的分片数和副本数量
    PUT twitter
    {
    "settings": {
    "index": {
    "number_of_shards": 3,
    "number_of_replicas": 2
    }
    }
    }
    //也可以将uri指向具体的配置setting列
    PUT /twitter/_settings
    {
    "index": {
    "number_of_replicas": 2
    }
    }
    //常见配置
    PUT twitter
    {
    "settings": {
    "number_of_shards": "1",
    "number_of_replicas": "1",
    //索引刷新频率
    "refresh_interval": "30s",
    //事务日志
    "translog": {
    "sync_interval": "30s",
    "durability": "async"
    }
    }
    }

文档操作(增-删-改-查)

[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ocs.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ocs.html)

创建

1
2
3
4
5
PUT /test_index/_doc/1
{
"username": "alfred",
"age": 1
}

创建文档时,如果 索引不存在,es 会 自动创建对应的 index

查询文档(根据id查询)

非搜索

GET /test_index/_doc/1

搜索(后续细说)

1
2
3
4
5
6
7
8
GET /test_index/_search
{
"query": {
"term": {
"_id": "1"
}
}
}

批量操作文档

es 允许一次创建多个文档,从而减少网络传输开销,提升写入速 率

1
2
3
4
5
6
7
8
POST _bulk
//指定索引操作,索引新增id可以不给
{"index":{"_index":"test_index","_id":"3"}}
//插入索引值
{"username":"alfred","age":10}
{"delete":{"_index":"test_index","_id":"1"}}
{"update":{"_id":"2","_index":"test_index"}}
{"doc":{"age":"20"}}

索引别名别名(alias)

  • 可以指向多个索引的软链
  • 简单而重要的功能,为高级功能,如 rollover、ILM 等提供了 实现基础
  • 对使用者屏蔽真实索引,降低心智负担

https://www.elastic.co/guide/en/elasticsearch/reference/8.1/indices-aliases.htm

读写分离数据

为避免数据量过大,我们会分为两个索引。新的数据将只会写道新索引nginx-logs-2023-01-02中去,查询则可以往两边查询。

graph TB
A[nginx-logs]--is_write_index:false-->B[nginx-logs-2021-01-01]
A--is_write_index:true-->C[nginx-logs-2023-01-02]

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//基于索引配置实现(适合对单个索引添加别名)
PUT /nginx-logs-2023-01-01
{
"mappings": {
"properties": {
......
}
},
"aliases": {
"nginx_logs": {}
}
}
//也可以通过aliases专有api实现(适合需要为多个索引创建别名的场景)
POST /_aliases
{
"actions": [
{
"add": {
"index": "nginx-logs-2023-01-02",
"alias": "nginx_logs",
"is_write_index": true
}
},
{
"add": {
"index": "nginx-logs-2023-01-01",
"alias": "nginx_logs",
"is_write_index": false
}
}
]
}

过滤条件数据

别名可以提供filter过滤响应数据

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
PUT /nginx-logs-2023-01-01
{
"mappings": {
"properties": {
"method": {
"type": "keyword"
}
}
},
"aliases": {
//这个别名会查全部数据
"nginx-logs": {},
//这个别名只会查method=post的数据
"nginx-logs-POST": {
//可以看出实际的过滤就是封装了额外的查询条件
"filter": {
"term": {
"method": "POST"
}
}
}
}
}

Index Template

[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/in dex-templates.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/in dex-templates.html)

  • 主要用于在新建索引时自动应用预先设定的配置,简化索引创 建的操作步骤
  • 可以设定索引的配置和 mapping
  • 当匹配多个模板时,只采用 priority 最大的模板

索引模板 API,endpoint 为 _index_template,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PUT _index_template/test_template
{
//当索引名称以te或bar开头时应用template下面的settings属性和mappings属性
"index_patterns": [
"te*",
"bar*"
],
"priority": 500,
"template": {
"settings": {
"number_of_shards": 1
},
"mappings": {
......
}
}
}
PUT test_index
GET test_index

Component Template

  • 组件模板,英文为 Component Template
    • 类似积木,可以将相同的配置快速添加到不同的索引模板中, 提升维护的效率
    • 模板的模板

创建模板

组件模板API为:PUT /_component_template/name

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 模板2
PUT /_component_template/ct1
{
"template": {
"settings": {
"index.number_of_shards": 2
}
}
}
// 模板1
PUT /_component_template/ct2
{
"template": {
"settings": {
"index.number_of_replicas": 0
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
}
}
}
}
}

// 创建模板,并应用上述组件模板(通过composed_of)
POST /_index_template/template1
POST /_index_template/template1
{
"index_patterns": [
"te*"
],
"template": {
"settings": {
//本模板的优先级更高,不会被覆盖
"index.number_of_shards": 3
}
},
"composed_of": [
"ct1",
"ct2"
]
}

//只应用一个
POST /_index_template/template2
{
"index_patterns": [
"bark*"
],
"template": {
"settings": {
"index.number_of_shards": 3
}
},
"composed_of": [
"ct2"
]
}

查看模板(测试模板)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//有专门的测试API测试组件模板效果
POST /_index_template/_simulate_index/my-index-000001 //通过模拟创建索引(实际不会创建)查看模板在索引的应用情况
POST /_index_template/_simulate/template_1 //查看模板
POST /_index_template/_simulate //模拟创建模板,查看模板创建情况
{
"index_patterns": [
"my*"
],
"template": {
"settings": {
"index.number_of_shards": 3
}
},
"composed_of": [
"ct1",
"ct2"
]
}

Dynamic Template(动态模版)

[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ynamic-templates.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/d ynamic-templates.html)

es 可以自动识别文档字段类型,从而降低用户使用成本,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//在没有test_index索引的情况下直接写入数据
PUT /test_index/doc/1
{
"username":"alfred",
"age":1
}
//查询发现会自动创建索引文件,被按默认类型去映射json请求中的类型
GET /test_index/_mapping
{
"test_index": {
"mappings": {
"properties": {
"age": {
"type": "long"
},
"username": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}

默认动态模板

JSON data type Elasticsearch data type
null 忽略
double float
long long
可识别为日期的 string date
可识别为数字的 string float or long
其他 string text with a .keyword sub-field
array 选择第一个非 null 元素类型
object object

还是有规律的

自定义动态模板

假设:

允许根据 es 自动识别的数据类型、字段名等来动态设定字段类型 ,可以实现如下效果:

  • 所有字符串类型都设定为 keyword 类型,即默认不分词
  • 所有以 message 开头的字段都设定为 text 类型,即分词
  • 所有以 int_ 开头的字段都设定为 int 类型

示例:(注意和index template区分)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

// 可以看出动态模板api和index template一样
PUT _index_template/my_custom_template
{
"index_patterns": ["*"], // 应用到所有索引
"template": {
"mappings": {
"dynamic_templates": [
/* 规则1:所有字符串默认设为keyword */
{
"strings_as_keywords": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
},
/* 规则2:message开头的字段设为text */
{
"message_text_fields": {
"match": "message*",
"mapping": {
"type": "text",
"fields": {
"keyword": { // 同时保留keyword子字段
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
/* 规则3:int_开头的字段设为integer */
{
"int_prefix_fields": {
"match": "int_*",
"mapping": {
"type": "integer"
}
}
}
]
}
}
}
// 也可以直接应用到索引上
PUT test_index
{
"mappings": {
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
]
}
}

一些匹配参数

  • match_mapping_type 匹配 es 自动识别的字段类型,如 boolean,long,string等
  • match,unmatch 匹配字段名
  • path_match,path_unmatch 匹配路径

数据重做

Reindex - _update_by_query

  • api:POST blog_index/_update_by_query?conflicts=proceed
    • blog_index:文档名称,对索引中的文档原地重建
    • confilicts=proceed:遇到版本冲突覆盖执行
  • 一些额外操作如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
POST blog_index/_update_by_query
{
//给定脚本用于对新数据修改
"script": {
//source.likes字段+1
"source": "ctx._source.likes++",
"lang": "painless"
},
//给定查询条件执行部分重建
"query": {
"term": {
"user": "tom"
}
}
}

Reindex - _reindex

从A索引迁移到B索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//简单的迁移
POST _reindex
{
"source": {
"index": "blog_index"
},
"dest": {
"index": "blog_new_index"
}
}
//带个性化条件
POST _reindex
{
//冲突覆盖
"conflicts": "proceed",
"source": {
"index": "blog_index",
//同样可以给定查询条件
"query": {
"term": {
"user": "tom"
}
}
},
"dest": {
"index": "blog_new_index"
}
}

Reindex异步重建

  • 数据重建的时间受源索引文档数量的影响,当数量越多时,所需时间 越长
  • 此时可以通过设定 url 参数 wait_for_completion 为 false 来异步执行 ,ES 以 task 来描述此类执行任务
    • POST blog_index/_update_by_query?conflicts=proceed&wait_for_completion=false
  • ES 提供了 Task API 来查看任务的执行进度和相关数据
    • GET _tasks/_xxxxxx

Ingest Pipeline(数据管道)

https://www.elastic.co/guide/en/elasticsearch/reference/8.1/ingest.html

Ingest Node 是 ES 的预处理角色的节点

  • 功能上类似 logstash 的 filter,提供 grok、add field、drop field 等 ETL 的能力
  • 是在数据落入 ES 数据节点前的最后一道处理流程
  • 不能像 logstash 一般可以自由定制 input 和 output,相当于只 有 logstash filter 的能力,input 和 output 都是 es 自身

使用场景举例

  • 日志数据希望通过 grok 提取部分信息到专用字段里,如 ip statu_code 等
  • 线上索引数据某个字段名设置错了,希望修复为正确的字段名
  • 希望增加一个 ingest_timestamp 记录数据进入 es 的时间

Ingest Pipeline 是定义一个处理管道,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
PUT _ingest/pipeline/my-pipeline-id 
{
"description": "describe pipeline", // 管道的描述信息
"processors": [ // 处理器数组(定义数据处理步骤)
{ // 第一个处理器
"set": { // 使用set处理器
"field": "foo", // 操作的目标字段
"value": "bar" // 要设置的值
}
}
]
}

注意:Ingest Pipeline 的使用时机是在写入文档的时候:

使用场景

1
2
3
PUT test_pipeline/_doc/1?pipeline=my-pipeline-id //开始写入时添加pipeline

POST /_bulk?pipeline=my-pipeline-id // 批量处理时添加pipeline

索引重建时也能使用

1
2
3
4
5
6
7
8
PUT /my_index/_settings //直接在索引上绑定pipeline
{
"index": {
"default_pipeline": "my-pipeline-id"
}
}

POST my_index/_update_by_query?pipeline=mypipeline-id

调试pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//在已有的pipeline上测试数据
POST _ingest/pipeline/my_pipeline/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
}
}
]
}
//先不创建pipeline测试
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "_description",
"processors": [
{
"set": {
"field": "field2",
"value": "_value"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}

Ingest Pipeline Processors

[https://www.elastic.co/guide/en/elasticsearch/reference/8.1/inge st-processors.html](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/inge st-processors.html)

Ingest Pipeline 的核心组成是众多的处理器 Processors:

Date:日期处理器

负责将日期字符串转换为时间戳(date)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"description": "...",
"processors": [
{
"date": {
"field": "initial_date",
"target_field": "timestamp",
"formats": [
"dd/MM/yyyy hh:mm:ss"
],
"timezone": "Asia/Shanghai"
}
}
]
}

Drop:条件过滤处理器

符合条件时丢弃该文档,不入库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"description": "...",
"processors": [
{
"drop": {
"if": "ctx.network_name == 'Guest'"
}
},
{
"drop": {
"if": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if
(tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
]
}

Foreach:遍历

遍历数组字段的所有值,并做相应的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"description": "...",
"processors": [
{
"foreach": {
"field": "values",
"processor": {
"uppercase": {
"field": "_ingest._value"
}
}
}
}
]
}

Grok

基于 Grok 从原始内容中匹配部分内容到独立的字段

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"description": "...",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes}%{NUMBER:duration}"
]
}
}
]
}

JSON

将 json 字符串转成 json 对象

1
2
3
4
5
6
7
8
9
10
11
{
"description": "...",
"processors": [
{
"json": {
"field": "string_source",
"target_field": "json_target"
}
}
]
}

Remove

删除指定字段

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"description": "...",
"processors": [
{
"remove": {
"field": [
"user_agent",
"url"
]
}
}
]
}

Rename

字段重命名

1
2
3
4
5
6
7
8
9
10
11
{
"description": "...",
"processors": [
{
"rename": {
"field": "provider",
"target_field": "cloud.provider"
}
}
]
}

Set

设置某个字段值

  • 设置某字段的值,此处可以获取现有字段的值来实现拼接等该功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
"description": "...",
"processors": [
{
"set": {
"field": "my_field",
"value": 582.1
}
},
{
"set": {
"field": "host.os.name",
"value": "{{os}}"
}
},
{
"set": {
"field": "received",
"value": "{{_ingest.timestamp}}"
}
},
{
"set": {
"field": "field_c",
"value": "{{field_a}} {{field_b}}"
}
},
{
"set": {
"field": "_index",
"value": "{{field_index_name}}"
}
},
{
"set": {
"field": "{{service}}",//可以用某个字段的值来做key
"value": "{{code}}"
}
}
]
}

Script

通过自定义脚本的方式来灵活更改相关字段名和字段值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"description": "...",
"processors": [
{
"script": {
"lang": "painless",
"source": "ctx.field_a_plus_b_times_c = (ctx.field_a + ctx.field_b) * params.param_c",
"params": {
"param_c": 10
}
}
}
]
}

Split

将字符串分割为数组

1
2
3
4
5
6
7
8
9
10
11
{
"description": "...",
"processors": [
{
"split": {
"field": "my_field",
"separator": "\\s+"
}
}
]
}

Ingest Pipeline 异常处理

处理异常时,默认会停止并退出后续的处理流程

  • 可以通过增加异常捕捉逻辑,增加自定义的处理流程
  • 比如报错的数据写入另一个索引,或者打上另外的标签
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"description": "my first pipeline with handled exceptions",
"processors": [
{
"rename": {
"field": "foo",
"target_field": "bar",
//往某个字段中写入失败标识
"on_failure": [
{
"set": {
"field": "error",
"value": "field \"foo\" does not exist, cannot rename to \"bar\""
}
}
]
}
}
]
}

//往某个索引中写入当前失败的document的索引index
{
"description": "my first pipeline with handled exceptions",
"processors": [ ... ],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed-{{ _index }}"
}
}
]
}

Elasticsearch搜索引擎(数据操作)
https://andrewjiao.github.io/2022/09/01/elasticsearch/ElasticSearch数据操作/
作者
Andrew_Jiao
发布于
2022年9月1日
许可协议