【Elasticsearch7.0】文档接口之查询update接口

  |   0 评论   |   43 浏览

基本语法

查询update是对每个文档进行更新,而不是改变source,这对于获取新属性或在线映射更改非常有用,如下示例:

curl -XPOST "http://127.0.0.1:9200/test/_update_by_query?conflicts=proceed&pretty"
{
  "took" : 460,
  "timed_out" : false,
  "total" : 9,
  "updated" : 9,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [ ]
}

_update_by_query获取索引启动时的快照,并使用内部版本控制对找到的内容进行索引,如果在获取完快照之后,有其他操作对文档进行了修改,那么会出现版本冲突问题,如果版本号匹配,那么文档会进行更新,版本号会自增。注意:因为版本号不支持0,所有如果接口中定义了版本号为0,那么请求将失败。
所有查询或者更新失败,查询update都会中止,并返回错误响应,已经执行的更新仍然有效,换句话说就是进程不会滚回,只是中止当前操作。如果一个失败中止发生了之后,之前的失败结果也会在failures中返回,因此会出现很多失败实体。
如果你只想简单的计算版本号,而不让进程中止,那么可以在请求url或者请求方法体中加入conflicts=proceed或"conflicts": “proceed”。
我们也可以使用请求体来执行查询update接口:

curl -XPOST "http://127.0.0.1:9200/test/_update_by_query?conflicts=proceed&pretty" -H "Content-Type:application/json" -d'
{
  "query": { 
    "term": {
      "user": "kimchy"
    }
  }
}'

返回值为:

{
  "took" : 61,
  "timed_out" : false,
  "total" : 4,
  "updated" : 4,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [ ]
}

查询update还支持使用脚本,如示例:

curl -XPOST "http://127.0.0.1:9200/test/_update_by_query?pretty" -H "Content-Type:application/json" -d'
{
  "script": {
    "source": "ctx._source.likes",
    "lang": "painless"
  },
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
}'

返回值为:

{
  "took" : 38,
  "timed_out" : false,
  "total" : 4,
  "updated" : 4,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [ ]
}

可以通过脚本来设置操作,通过ctx.op来设置,值为noop(空操作)、delete(删除)。
查询update也支持对多个索引进行操作,如:

curl -XPOST "http://127.0.0.1:9200/test,test2/_update_by_query?pretty"
{
  "took" : 56,
  "timed_out" : false,
  "total" : 11,
  "updated" : 11,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [ ]
}

如果使用了路由,那么查询update的时候也需要带上路由,如:

curl -XPOST "http://127.0.0.1:9200/test/_update_by_query?pretty&routing=1"
{
  "took" : 28,
  "timed_out" : false,
  "total" : 9,
  "updated" : 9,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [ ]
}

查询update的默认滚动大小是1000,可以通过scroll_size来改变该值,如:

curl -XPOST "http://127.0.0.1:9200/test/_update_by_query?pretty&scroll_size=100"
{
  "took" : 29,
  "timed_out" : false,
  "total" : 9,
  "updated" : 9,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [ ]
}

还可以使用pipeline属性来使用Ingest节点功能,如:

curl -XPUT "http://127.0.0.1:9200/_ingest/pipeline/set-foo" -H "Content-Type:application/json" -d'
{
  "description" : "sets foo",
  "processors" : [ {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
  } ]
}'
curl -XPOST "http://127.0.0.1:9200/test/_update_by_query?pipeline=set-foo&pretty"

请求参数

除了想标准的pretty参数外,查询update还支持以下参数:refreshwait_for_completionwait_for_active_shardstimeoutscroll
refresh:更新完之后会把数据刷到所有分片,更update接口有所不同,update接口只刷新当前接收的分片,查询update不支持wait_for。
wait_for_completion:如果请求包含wait_for_completion=false,则Elasticsearch将执行一些操作前检查,启动请求,然后返回一个任务,该任务可与Tasks api一起用于取消或获取任务状态。Elasticsearch还将创建此任务的记录,作为.tasks/task/${taskId}的文档。这些任务的是否删除都是取决于你。当你完成它时,删除它,以便Elasticsearch可以回收它使用的空间。
wait_for_active_shards:控制必须多少副本分片是活跃的。
timeout:控制每个写请求等待不可用碎片变为可用碎片的时间。
scroll:控制数据是多久的,如?scroll=10m,默认是5分钟。
requests_per_second:可以设置任何正数,在每个查询update操作中设置一个超时时间来控制速率,设置成-1表示禁用。批次大小为1000,requests_per_second为500,

target_time = 1000 / 500 per second = 2 seconds 
wait_time = target_time - write_time = 2 seconds - 0.5 seconds = 1.5 seconds

响应体

{
  "took": 147,
  "timed_out": false,
  "total": 5,
  "updated": 5,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
	"bulk": 0,
	"search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "failures": []
}

took:从整个操作开始到结束的毫秒数。
timed_out:如果请求超时,该值返回true。
total:成功执行的总数。
updated:成功执行更新的数量。
deleted:成功执行删除的数量。
batches:回滚响应数量。
version_conflicts:版本冲突的数量 。
noops:因为脚本使用ctx.op忽略了noop的文档数量 。
retries:尝试重试的次数。
throttled_millis:请求休眠的毫秒数 。
requests_per_second:每秒有效执行的请求数 。
throttled_until_millis:在查询update响应中,该字段应该始终等于零。它只有在使用Task API时才有意义,在Task API中,它指示下一次(毫秒)再次执行节流请求的毫秒,以符合requests_per_second的配置。
failures:失败结果,会有多个错误结果,以数组的形式输出。

使用task api

可以使用task api来处理查询update,示例如:

curl -XGET "http://127.0.0.1:9200/_tasks?detailed=true&actions=*byquery&pretty"
{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/update/byquery",
          "status" : {    
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": {
              "bulk": 0,
              "search": 0
            },
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}

查看任务详情

curl -XGET "http://127.0.0.1:9200/_tasks/r1A2WoRbTwKZ516z6NEs5A:36619"

取消任务

curl -XPOST "http://127.0.0.1:9200/_tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel"

切片

查询update支持滚动切分,通过并行可以提升处理性能。

手动切片

通过为每个请求提供一个切片id和总片数,手工切片查询更新:

curl -XPOST "http://127.0.0.1:9200/test/_update_by_query?pretty" -H "Content-Type:application/json" -d'
{
  "slice": {
    "id": 0,
    "max": 2
  },
  "script": {
    "source": "ctx._source[\"extra\"] = \"test\""
  }
}'
curl -XPOST "http://127.0.0.1:9200/test/_update_by_query?pretty" -H "Content-Type:application/json" -d'
{
  "slice": {
    "id": 1,
    "max": 2
  },
  "script": {
    "source": "ctx._source[\"extra\"] = \"test\""
  }
}'

自动切片

使用slices参数来设置需要多少slices。

curl -XPOST "http://127.0.0.1:9200/test/_update_by_query?refresh&slices=5&pretty" -H "Content-Type:application/json" -d'
{
  "script": {
    "source": "ctx._source[\"extra\"] = \"test\""
  }
}'

部分返回值为:
image.png
设置slices为auto让es决定使用多少切片。

也可以关注我的公众号:程序之声
图片
关注公众号,领取更多资源

本文为博主原创文章,未经博主允许不得转载。

评论

发表评论