Elasticsearch 并发修改乐观锁

版本控制的一个例子

curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}'
{
    "_index": "test",
    "_type": "test",
    "_id": "1",
    "_version": 1,
    "created": true
}
curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}'
{
    "_index": "test",
    "_type": "test",
    "_id": "1",
    "_version": 2,
    "created": true
}
curl -XPOST http://localhost:9200/test/test/1?version=1 -d '{"msg": "test"}'
{
    "error": "RemoteTransportException[[elasticsearch_226][inet[/172.16.18.226:9300]][indices:data/write/index]]; nested: VersionConflictEngineException[[test][0] [test][1]: version conflict, current [4], provided [1]]; ",
    "status": 409
}

提示:版本冲突

使用外部系统提供的版本号

Elasticsearch 的乐观锁,可以使用外部系统提供的版本号:

curl -XPOST http://localhost:9200/test/test/1?version=10&version_type=external -d '{"msg": "test"}'
{
    "_index": "test",
    "_type": "test",
    "_id": "1",
    "_version": 10,
    "created": true
}

curl -XPOST http://localhost:9200/test/test/1?version=9&version_type=external -d '{"msg": "test"}'
{
    "error": "VersionConflictEngineException[[test][0] [test][1]: version conflict, current [10], provided [9]]",
    "status": 409
}

这时Elasticsearch将只检查提供的版本是否比当前保存在索引中的版本大(大多少不重要),如果是成功,否则失败。

Elasticsearch内部版本号

在Elasticsearch中,更新请求实际上是分为两个阶段,获取文档,修改文档,然后保存文档。
那么当两个更新请求同时要修改文档的时候,系统乐观的认为不会有两个并发请求对一个系统操作。
文档原本的版本为1,请求A获取了version为1的文档,请求B也获取了version为1的文档,然后请求A修改完文档后,并且先执行了保存操作,这个时候,系统中的文档version变为了2。
这个时候,B再执行保存操作的时候,告诉系统我要修改version为1的文档。系统就会抛出一个错误,说文档版本不匹配。然后这个错误由应用程序自己来进行控制。
这种机制在请求量大的时候会比悲观锁机制好。但是缺点是需要程序处理版本冲突错误,可能一般的方法是封装更新操作,并且设置重复重试次数。

注意这里的更新指的是:

curl -XPOST http://localhost:9200/test/test/1/_update -d '{"msg": "test"}'

而不是:

curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}'

这个是create,这种在ES内部没有乐观锁的概念,是直接覆盖,这种想解决数据被覆盖,就要显示的带着version

下面的程序,并发6个进程同时批量更新一个文档,在结果能看到有的请求出错了,返回版本冲突。

<?php

require_once dirname(__FILE__).'/../shjf/vendor/autoload.php';

$client = new Elasticsearch\Client(['hosts' => ['172.16.18.226:9200']]);

for($i=0;$i<5;$i++){    
    if (pcntl_fork() == 0) break;
}

$params=[
    'index' => 'test',
    'type'  => 'test',
    'body' => [],
];  

for($i=0;$i<1;$i++){
    $params['body'][] = ['update' => ['_id' => 111]];
    $params['body'][] = ["doc" => ["id" => $i]]; 
}
try 
{ 
    $result = $client->bulk($params);
    var_dump($result) . "\n";
} 
catch(Exception $e) 
{ 
    echo $e->getMessage() . "\n";
    $info = $client->transport->getLastConnection()->getLastRequestInfo();
    print_r($info);
    exit;
}

echo "end\n";

?>

{
  "took":1,
  "errors":true,
  "items":[
    {"update": {
        "_index":"test",
        "_type":"test",
        "_id":"111",
        "status":409,
        "error":"VersionConflictEngineException[[test][0] [test][111]: version conflict, current [3], provided [2]]"}
    }
  ]
}

参考资料:

  • 0
    点赞
  • 1
    收藏
    觉得还不错? 一键收藏
  • 0
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值