-
Notifications
You must be signed in to change notification settings - Fork 4.1k
deadloop between InternalRangeLookup and InternalPushTxn request #435
Description
If a transaction modify the metadata of a range, it may lead a deadloop between InternalRangeLookup and InternalPushTxn request.When a request was send to a range which metadata was modified in the transaction, the dist_sender may call a InternalRangeLookup request to get the metadata of range, when the InternalRangeLookup execute on the range, it will genete a WriteIntend error because a read-write conflict,the store will send a InternalPushTxn request to push the txn, the dist_sender will send a same InternalRangeLookup to route the InternalPushTxn request,this is a deadloop.
Use the engine.MVCCIterateCommitted instead of the engine.MVCCScan in the InternalRangeLookup method of range can solve the problem,but if a transaction modified the metadata of range committed but forget clean the intent of the range metadata,engine.MVCCIterateCommitted will never get the new updated data.so InternalRangeLookup can use the engine.MVCCScan to get the data, if error occur,try engine.MVCCIterateCommitted.
func(r_Range)InternalRangeLookup(batchengine.Engine,args_proto.InternalRangeLookupRequest,reply_proto.InternalRangeLookupResponse){
iferr:=engine.ValidateRangeMetaKey(args.Key);err!=nil{
reply.SetGoError(err)
return
}
rangeCount:=int64(args.MaxRanges)
ifrangeCount<1{
reply.SetGoError(util.Errorf(
"Rangelookupspecifiedinvalidmaximumrangecount%d:mustbe>0",rangeCount))
return
}
//Wewanttosearchforthemetadatakeyjustgreaterthanargs.Key.Scan
//forboththerequestedkeyandthekeysimmediatelyafterwards,upto
//MaxRanges.
metaPrefix:=proto.Key(args.Key[:len(engine.KeyMeta1Prefix)])
nextKey:=proto.Key(args.Key).Next()
kvs,err:=engine.MVCCScan(batch,nextKey,metaPrefix.PrefixEnd(),rangeCount,args.Timestamp,args.Txn)
iferr!=nil{
kvs=[]proto.KeyValue{}
if_,ok:=err.(_proto.WriteIntentError);ok{
kvs:=[]proto.KeyValue(nil)
iferr:=engine.MVCCIterateCommitted(batch,nextKey,metaPrefix.PrefixEnd(),func(kvproto.KeyValue)(bool,error){
kvs=append(kvs,kv)
ifrangeCount!=0&&rangeCount==int64(len(kvs)){
returntrue,nil
}
returnfalse,nil
});err!=nil{
reply.SetGoError(err)
return
}
}
}