在线不卡日本ⅴ一区v二区_精品一区二区中文字幕_天堂v在线视频_亚洲五月天婷婷中文网站

  • <menu id="lky3g"></menu>
  • <style id="lky3g"></style>
    <pre id="lky3g"><tt id="lky3g"></tt></pre>

    深入淺出的Redis分布式鎖|得物技術(shù)

    深入淺出的Redis分布式鎖|得物技術(shù)

    1. 分布式

    1.1 分布式鎖介紹

    分布式鎖是控制不同系統(tǒng)之間訪問共享資源的一種鎖實(shí)現(xiàn),如果不同的系統(tǒng)或同一個(gè)系統(tǒng)的不同主機(jī)之間共享了某個(gè)資源時(shí),往往需要互斥來防止彼此干擾來保證一致性。

    1.2 為什么需要分布式鎖

    在單機(jī)部署的系統(tǒng)中,使用線程鎖來解決高并發(fā)的問題,多線程訪問共享變量的問題達(dá)到數(shù)據(jù)一致性,如使用synchornized、ReentrantLock等。但是在后端集群部署的系統(tǒng)中,程序在不同的JVM虛擬機(jī)中運(yùn)行,且因?yàn)閟ynchronized或ReentrantLock都只能保證同一個(gè)JVM進(jìn)程中保證有效,所以這時(shí)就需要使用分布式鎖了。這里就不再贅述synchornized鎖的原理,想了解可以讀這篇文章《深入理解synchronzied底層原理》。

    1.3 分布式鎖需要具備的條件

    分布式鎖需要具備互斥性、不會(huì)死鎖和容錯(cuò)等。互斥性,在于不管任何時(shí)候,應(yīng)該只能有一個(gè)線程持有一把鎖;不會(huì)死鎖在于即使是持有鎖的客戶端意外宕機(jī)或發(fā)生進(jìn)程被kill等情況時(shí)也能釋放鎖,不至于導(dǎo)致整個(gè)服務(wù)死鎖。容錯(cuò)性指的是只要大多數(shù)節(jié)點(diǎn)正常工作,客戶端應(yīng)該都能獲取和釋放鎖。

    2. 分布式鎖的實(shí)現(xiàn)方式

    目前主流的分布式鎖的實(shí)現(xiàn)方式,基于數(shù)據(jù)庫實(shí)現(xiàn)分布式鎖、基于Redis實(shí)現(xiàn)分布式鎖、基于ZooKeeper實(shí)現(xiàn)分布式鎖,本篇文章主要介紹了Redis實(shí)現(xiàn)的分布式鎖。

    2.1 由單機(jī)部署到集群部署鎖的演變

    一開始在redis設(shè)置一個(gè)默認(rèn)值key:ticket 對(duì)應(yīng)的值為20,并搭建一個(gè)Spring Boot服務(wù),用來模擬多窗口賣票現(xiàn)象,配置類的代碼就不一一列出了。

    2.1.1 單機(jī)模式解決并發(fā)問題

    一開始的時(shí)候在redis預(yù)設(shè)置的門票值ticket=20,那么當(dāng)一個(gè)請(qǐng)求進(jìn)來之后,會(huì)判斷是否余票是否是大于0,若大于0那么就將余票減一,再重新寫入Redis中,倘若庫存小于0,那么就會(huì)打印錯(cuò)誤日志。

    @RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping(“/lock”) public String deductTicket() throws InterruptedException { String lockKey = “ticket”; int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount – 1; log.info(“扣減成功,剩余票數(shù):” + realTicketCount + “”); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + “”); } else { log.error(“扣減失敗,余票不足”); } return “end”; } }

    代碼運(yùn)行分析:這里明顯有一個(gè)問題,就是當(dāng)前若有兩個(gè)線程同時(shí)請(qǐng)求進(jìn)來,那么兩個(gè)線程同時(shí)請(qǐng)求這段代碼時(shí),如圖thread 1 和thread 2同時(shí),兩個(gè)線程從Redis拿到的數(shù)據(jù)都是20,那么執(zhí)行完成后thread 1 和thread 2又將減完后的庫存ticket=19重新寫入Redis,那么數(shù)據(jù)就會(huì)產(chǎn)生問題,實(shí)際上兩個(gè)線程各減去了一張票數(shù),然而實(shí)際寫進(jìn)就減了一次票數(shù),就出現(xiàn)了數(shù)據(jù)不一致的現(xiàn)象。

    這種問題很好解決,上述問題的產(chǎn)生其實(shí)就是從Redis中拿數(shù)據(jù)和減余票不是原子操作,那么此時(shí)只需要將按下圖代碼給這倆操作加上synchronized同步代碼快就能解決這個(gè)問題。

    @RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping(“/lock”) public String deductTicket() throws InterruptedException { String lockKey = “ticket”; synchronized (this) { int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount – 1; log.info(“扣減成功,剩余票數(shù):” + realTicketCount + “”); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + “”); } else { log.error(“扣減失敗,余票不足”); } } return “end”; }}

    代碼運(yùn)行分析:此時(shí)當(dāng)多個(gè)線程執(zhí)行到第14行的位置時(shí),只會(huì)有一個(gè)線程能夠獲取鎖,進(jìn)入synchronized代碼塊中執(zhí)行,當(dāng)該線程執(zhí)行完成后才會(huì)釋放鎖,等下個(gè)線程進(jìn)來之后就會(huì)重新給這段代碼上鎖再執(zhí)行。說簡單些就是讓每個(gè)線程排隊(duì)執(zhí)行代碼塊中的代碼,從而保證了線程的安全。

    上述的這種做法如果后端服務(wù)只有一臺(tái)機(jī)器,那毫無疑問是沒問題的,但是現(xiàn)在互聯(lián)網(wǎng)公司或者是一般軟件公司,后端服務(wù)都不可能只用一臺(tái)機(jī)器,最少都是2臺(tái)服務(wù)器組成的后端服務(wù)集群架構(gòu),那么synchronized加鎖就顯然沒有任何作用了。

    如下圖所示,若后端是兩個(gè)微服務(wù)構(gòu)成的服務(wù)集群,由nginx將多個(gè)的請(qǐng)求負(fù)載均衡轉(zhuǎn)發(fā)到不同的后端服務(wù)上,由于synchronize代碼塊只能在同一個(gè)JVM進(jìn)程中生效,兩個(gè)請(qǐng)求能夠同時(shí)進(jìn)兩個(gè)服務(wù),所以上面代碼中的synchronized就一點(diǎn)作用沒有了。

    用JMeter工具隨便測試一下,就很簡單能發(fā)現(xiàn)上述代碼的bug。實(shí)際上synchronized和juc包下個(gè)那些鎖都是只能用于JVM進(jìn)程維度的鎖,并不能運(yùn)用在集群或分布式部署的環(huán)境中。

    2.1.2 集群模式解決并發(fā)問題

    通過上面的實(shí)驗(yàn)很容易就發(fā)現(xiàn)了synchronized等JVM進(jìn)程級(jí)別的鎖并不能解決分布式場景中的并發(fā)問題,就是為了應(yīng)對(duì)這種場景產(chǎn)生了分布式鎖。

    本篇文章介紹了Redis實(shí)現(xiàn)的分布式鎖,可以通過Redis的setnx(只在鍵key不存在的情況下, 將鍵key的值設(shè)置為value。若鍵key已經(jīng)存在, 則SETNX命令不做任何動(dòng)作。)的指令來解決的,這樣就可以解決上面集群環(huán)境的鎖不唯一的情況。

    @RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping(“/lock”) public String deductTicket() throws InterruptedException { String lockKey = “ticket”; // redis setnx 操作 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, “dewu”); if (Boolean.FALSE.equals(result)) { return “error”; } int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount – 1; log.info(“扣減成功,剩余票數(shù):” + realTicketCount + “”); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + “”); } else { log.error(“扣減失敗,余票不足”); } stringRedisTemplate.delete(lockKey); return “end”; }}

    代碼運(yùn)行分析:代碼是有問題的,就是當(dāng)執(zhí)行扣減余票操作時(shí),若業(yè)務(wù)代碼報(bào)了異常,那么就會(huì)導(dǎo)致后面的刪除Redis的key代碼沒有執(zhí)行到,就會(huì)使Redis的key沒有刪掉的情況,那么Redis的這個(gè)key就會(huì)一直存在Redis中,后面的線程再進(jìn)來執(zhí)行下面這行代碼都是執(zhí)行不成功的,就會(huì)導(dǎo)致線程死鎖,那么問題就會(huì)很嚴(yán)重了。

    為了解決上述問題其實(shí)很簡單,只要加上一個(gè)try…finally即可,這樣業(yè)務(wù)代碼即使拋了異常也可以正常的釋放鎖。setnx + try … finally解決,具體代碼如下:

    @RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping(“/lock”) public String deductTicket() throws InterruptedException { String lockKey = “ticket”; // redis setnx 操作 try { Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, “dewu”); if (Boolean.FALSE.equals(result)) { return “error”; } int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount – 1; log.info(“扣減成功,剩余票數(shù):” + realTicketCount + “”); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + “”); } else { log.error(“扣減失敗,余票不足”); } } finally { stringRedisTemplate.delete(lockKey); } return “end”; }}

    代碼運(yùn)行分析:上述問題解決了,但是又會(huì)有新的問題,當(dāng)程序執(zhí)行到try代碼塊中某個(gè)位置服務(wù)宕機(jī)或者服務(wù)重新發(fā)布,這樣就還是會(huì)有上述的Redis的key沒有刪掉導(dǎo)致死鎖的情況。這樣可以使用Redis的過期時(shí)間來進(jìn)行設(shè)置key,setnx + 過期時(shí)間解決,如下代碼所示:

    @RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping(“/lock”) public String deductTicket() throws InterruptedException { String lockKey = “ticket”; // redis setnx 操作 try { Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, “dewu”); //程序執(zhí)行到這 stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); if (Boolean.FALSE.equals(result)) { return “error”; } int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount – 1; log.info(“扣減成功,剩余票數(shù):” + realTicketCount + “”); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + “”); } else { log.error(“扣減失敗,余票不足”); } } finally { stringRedisTemplate.delete(lockKey); } return “end”; }}

    代碼運(yùn)行分析:上述代碼解決了因?yàn)槌绦驁?zhí)行過程中宕機(jī)導(dǎo)致的鎖沒有釋放導(dǎo)致的死鎖問題,但是如果代碼像上述的這種寫法仍然還是會(huì)有問題,當(dāng)程序執(zhí)行到第18行時(shí),程序宕機(jī)了,此時(shí)Redis的過期時(shí)間并沒有設(shè)置,也會(huì)導(dǎo)致線程死鎖的現(xiàn)象。可以用了Redis設(shè)置的原子命設(shè)置過期時(shí)間的命令,原子性過期時(shí)間的setnx命令,如下代碼所示:

    @RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping(“/lock”) public String deductTicket() throws InterruptedException { String lockKey = “ticket”; // redis setnx 操作 try { Boolean result = stringRedisTemplate.opsForValue().setIfPresent(lockKey, “dewu”, 10, TimeUnit.SECONDS); if (Boolean.FALSE.equals(result)) { return “error”; } int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount – 1; log.info(“扣減成功,剩余票數(shù):” + realTicketCount + “”); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + “”); } else { log.error(“扣減失敗,余票不足”); } } finally { stringRedisTemplate.delete(lockKey); } return “end”; }}

    代碼運(yùn)行分析:通過設(shè)置原子性過期時(shí)間命令可以很好的解決上述這種程序執(zhí)行過程中突然宕機(jī)的情況。這種Redis分布式鎖的實(shí)現(xiàn)看似已經(jīng)沒有問題了,但在高并發(fā)場景下任會(huì)存在問題,一般軟件公司并發(fā)量不是很高的情況下,這種實(shí)現(xiàn)分布式鎖的方式已經(jīng)夠用了,即使出了些小的數(shù)據(jù)不一致的問題,也是能夠接受的,但是如果是在高并發(fā)的場景下,上述的這種實(shí)現(xiàn)方式還是會(huì)存在很大問題。

    如上面代碼所示,該分布式鎖的過期時(shí)間是10s,假如thread 1執(zhí)行完成時(shí)間需要15s,且當(dāng)thread 1線程執(zhí)行到10s時(shí),Redis的key恰好就是過期就直接釋放鎖了,此時(shí)thread 2就可以獲得鎖執(zhí)行代碼了,假如thread 2線程執(zhí)行完成時(shí)間需要8s,那么當(dāng)thread 2線程執(zhí)行到第5s時(shí),恰好thread 1線程執(zhí)行了釋放鎖的代碼————stringRedisTemplate.delete(lockKey); 此時(shí),就會(huì)發(fā)現(xiàn)thread 1線程刪除的鎖并不是其自己的加鎖,而是thread 2加的鎖;那么thread 3就又可以進(jìn)來了,那么假如一共執(zhí)行5s,那么當(dāng)thread 3執(zhí)行到第3s時(shí),thread 2又會(huì)恰好執(zhí)行到釋放鎖的代碼,那么thread 2又刪除了thread 3 加的鎖。

    在高并發(fā)場景下,倘若遇到上述問題,那將是災(zāi)難性的bug,只要高并發(fā)存在,那么這個(gè)分布式鎖就會(huì)時(shí)而加鎖成功時(shí)而加鎖失敗。

    解決上述問題其實(shí)也很簡單,讓每個(gè)線程加的鎖時(shí)給Redis設(shè)置一個(gè)唯一id的value,每次釋放鎖的時(shí)候先判斷一下線程的唯一id與Redis 存的值是否相同,若相同即可釋放鎖。設(shè)置線程id的原子性過期時(shí)間的setnx命令,具體代碼如下:

    @RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping(“/lock”) public String deductTicket() throws InterruptedException { String lockKey = “ticket”; String threadUniqueKey = UUID.randomUUID().toString(); // redis setnx 操作 try { Boolean result = stringRedisTemplate.opsForValue().setIfPresent(lockKey, threadUniqueKey, 10, TimeUnit.SECONDS); if (Boolean.FALSE.equals(result)) { return “error”; } int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount – 1; log.info(“扣減成功,剩余票數(shù):” + realTicketCount + “”); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + “”); } else { log.error(“扣減失敗,余票不足”); } } finally { if (Objects.equals(stringRedisTemplate.opsForValue().get(lockKey), threadUniqueKey)) { stringRedisTemplate.delete(lockKey); } } return “end”; }}

    代碼運(yùn)行分析:上述實(shí)現(xiàn)的Redis分布式鎖已經(jīng)能夠滿足大部分應(yīng)用場景了,但是還是略有不足,比如當(dāng)線程進(jìn)來需要的執(zhí)行時(shí)間超過了Redis key的過期時(shí)間,那么此時(shí)已經(jīng)釋放了,你其他線程就可以立馬獲得鎖執(zhí)行代碼,就又會(huì)產(chǎn)生bug了。

    分布式鎖Redis key的過期時(shí)間不管設(shè)置成多少都不合適,比如將過期時(shí)間設(shè)置為30s,那么如果業(yè)務(wù)代碼出現(xiàn)了類似慢SQL、查詢數(shù)據(jù)量很大那么過期時(shí)間就不好設(shè)置了。那么這里有沒有什么更好的方案呢?答案是有的——鎖續(xù)命。

    那么鎖續(xù)命方案的原來就在于當(dāng)線程加鎖成功時(shí),會(huì)開一個(gè)分線程,取鎖過期時(shí)間的1/3時(shí)間點(diǎn)定時(shí)執(zhí)行任務(wù),如上圖的鎖為例,每10s判斷一次鎖是否存在(即Redis的key),若鎖還存在那么就直接重新設(shè)置鎖的過期時(shí)間,若鎖已經(jīng)不存在了那么就直接結(jié)束當(dāng)前的分線程。

    2.2 Redison框架實(shí)現(xiàn)Redis分布式鎖

    上述“鎖續(xù)命”方案說起來簡單,但是實(shí)現(xiàn)起來還是挺復(fù)雜的,于是市面上有很多開源框架已經(jīng)幫我們實(shí)現(xiàn)好了,所以就不需要自己再去重復(fù)造輪子再去寫一個(gè)分布式鎖了,所以本次就拿Redison框架來舉例,主要是可以學(xué)習(xí)這種設(shè)計(jì)分布式鎖的思想。

    2.2.1 Redison分布式鎖的使用

    Redison實(shí)現(xiàn)的分布式鎖,使用起來還是非常簡單的,具體代碼如下:

    @RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping(“/lock”) public String deductTicket() throws InterruptedException { //傳入Redis的key String lockKey = “ticket”; // redis setnx 操作 RLock lock = redisson.getLock(lockKey); try { //加鎖并且實(shí)現(xiàn)鎖續(xù)命 lock.lock(); int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount – 1; log.info(“扣減成功,剩余票數(shù):” + realTicketCount + “”); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + “”); } else { log.error(“扣減失敗,余票不足”); } } finally { //釋放鎖 lock.unlock(); } return “end”; }}

    2.2.2 Redison分布式鎖的原理

    Redison實(shí)現(xiàn)分布式鎖的原理流程如下圖所示,當(dāng)線程1加鎖成功,并開始執(zhí)行業(yè)務(wù)代碼時(shí),Redison框架會(huì)開啟一個(gè)后臺(tái)線程,每隔鎖過期時(shí)間的1/3時(shí)間定時(shí)判斷一次是否還持有鎖(Redis中的key是否還存在),若不持有那么就直接結(jié)束當(dāng)前的后臺(tái)線程,若還持有鎖,那么就重新設(shè)置鎖的過期時(shí)間。當(dāng)線程1加鎖成功后,那么線程2就會(huì)加鎖失敗,此時(shí)線程2就會(huì)就會(huì)做類似于CAS的自旋操作,一直等待線程1釋放了之后線程2才能加鎖成功。

    2.2.3 Redison分布式鎖的源碼分析

    Redison底層實(shí)現(xiàn)分布式鎖時(shí)使用了大量的lua腳本保證了其加鎖操作的各種原子性。Redison實(shí)現(xiàn)分布式鎖使用lua腳本的好處主要是能保證Redis的操作是原子性的,Redis會(huì)將整個(gè)腳本作為一個(gè)整體執(zhí)行,中間不會(huì)被其他命令插入。

    Redisson核心使用lua腳本加鎖源碼分析:

    方法名為tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command):

    //使用lua腳本加鎖方法 RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, //當(dāng)?shù)谝粋€(gè)線程進(jìn)來會(huì)直接執(zhí)行這段邏輯 //判斷傳入的Redis的key是否存在,即String lockKey = “ticket”; “if (redis.call(‘exists’, KEYS[1]) == 0) then ” + //如果不存在那么就設(shè)置這個(gè)key為傳入值、當(dāng)前線程id 即參數(shù)ARGV[2]值(即getLockName(threadId)),并且將線程id的value值設(shè)置為1 “redis.call(‘hset’, KEYS[1], ARGV[2], 1); ” + //再給這個(gè)key設(shè)置超時(shí)時(shí)間,超時(shí)時(shí)間即參數(shù)ARGV[1](即internalLockLeaseTime的值)的時(shí)間 “redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” + “return nil; ” + “end; ” + //當(dāng)?shù)诙€(gè)線程進(jìn)來,Redis中的key已經(jīng)存在(鎖已經(jīng)存在),那么直接進(jìn)這段邏輯 //判斷這個(gè)Redis key是否存在且當(dāng)前的這個(gè)key是否是當(dāng)前線程設(shè)置的 “if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then ” + //如果是的話,那么就進(jìn)入重入鎖的邏輯,利用hincrby指令將第一個(gè)線程進(jìn)來將線程id的value值設(shè)置為1再加1 //然后每次釋放鎖的時(shí)候就會(huì)減1,直到這個(gè)值為0,這把鎖就釋放了,這點(diǎn)與juc的可重鎖類似 //“hincrby”指令為Redis hash結(jié)構(gòu)的加法 “redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); ” + “redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” + “return nil; ” + “end; ” + //倘若不是本線程加的鎖,而是其他線程加的鎖,由于上述lua腳本都是有線程id的校驗(yàn),那么上面的兩段lua腳本都不會(huì)執(zhí)行 //那么此時(shí)這里就會(huì)將當(dāng)前這個(gè)key的過期時(shí)間返回 “return redis.call(‘pttl’, KEYS[1]);”, Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); // KEYS[1]) ARGV[1] ARGV[2]}// getName()傳入KEYS[1],表示傳入解鎖的keyName,這里是 String lockKey = “ticket”;// internalLockLeaseTime傳入ARGV[1],表示鎖的超時(shí)時(shí)間,默認(rèn)是30秒// getLockName(threadId)傳入ARGV[2],表示鎖的唯一標(biāo)識(shí)線程id

    設(shè)置監(jiān)聽器方法:方法名tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId)。

    //設(shè)置監(jiān)聽器方法: private RFuture tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } //加鎖成功這里會(huì)返回一個(gè)null值,即ttlRemainingFuture為null //若線程沒有加鎖成功,那么這里返回的就是這個(gè)別的線程加過的鎖的剩余的過期時(shí)間,即ttlRemainingFuture為過期時(shí)間 RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); //如果還持有這個(gè)鎖,則開啟定時(shí)任務(wù)不斷刷新該鎖的過期時(shí)間 //這里給當(dāng)前業(yè)務(wù)加了個(gè)監(jiān)聽器 ttlRemainingFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { return; } Boolean ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining) { //定時(shí)任務(wù)執(zhí)行方法 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }

    定時(shí)任務(wù)執(zhí)行方法: 方法名scheduleExpirationRenewal(final long threadId):

    //定時(shí)任務(wù)執(zhí)行方法 private void scheduleExpirationRenewal(final long threadId) { if (expirationRenewalMap.containsKey(getEntryName())) { return; } //這里new了一個(gè)TimerTask()定時(shí)任務(wù)器 //這里定時(shí)任務(wù)會(huì)推遲執(zhí)行,推遲的時(shí)間是設(shè)置的鎖過期時(shí)間的1/3, //很容易就能發(fā)現(xiàn)是一開始鎖的過期時(shí)間默認(rèn)值30s,具體可見private long lockWatchdogTimeout = 30 * 1000; //過期時(shí)間單位是秒 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { RFuture future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, //這里又是一個(gè)lua腳本 //這里lua腳本先判斷了一下,Redis的key是否存在且設(shè)置key的線程id是否是參數(shù)ARGV[2]值 “if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then ” + //如果這個(gè)線程創(chuàng)建的Redis的key即鎖仍然存在,那么久給鎖的過期時(shí)間重新設(shè)值為internalLockLeaseTime,也就是初始值30s “redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” + //Redis的key過期時(shí)間重新設(shè)置成功后,這里的lua腳本返回的就是1 “return 1; ” + “end; ” + //如果主線程已經(jīng)釋放了這個(gè)鎖,那么這里的lua腳本就會(huì)返回0,直接結(jié)束“看門狗”的程序 “return 0;”, Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { expirationRenewalMap.remove(getEntryName()); if (!future.isSuccess()) { log.error(“Can’t update lock ” + getName() + ” expiration”, future.cause()); return; } if (future.getNow()) { // reschedule itself scheduleExpirationRenewal(threadId); } } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) { task.cancel(); } }

    //上面源碼分析過了,當(dāng)加鎖成功后tryAcquireAsync()返回的值為null, 那么這個(gè)方法的返回值也為nullprivate Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId));}

    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { //獲得當(dāng)前線程id long threadId = Thread.currentThread().getId(); //由上面的源碼分析可以得出,當(dāng)加鎖成功后,這個(gè)ttl就是null //若線程沒有加鎖成功,那么這里返回的就是這個(gè)別的線程加過的鎖的剩余的過期時(shí)間 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired //如果加鎖成功后,這個(gè)ttl就是null,那么這個(gè)方法后續(xù)就不需要做任何邏輯 //若沒有加鎖成功這里ttl的值不為null,為別的線程加過鎖的剩余的過期時(shí)間,就會(huì)繼續(xù)往下執(zhí)行 if (ttl == null) { return; } RFuture future = subscribe(threadId); commandExecutor.syncSubscription(future); try { //若沒有加鎖成功的線程,會(huì)在這里做一個(gè)死循環(huán),即自旋 while (true) { //一直死循環(huán)嘗試加鎖,這里又是上面的加鎖邏輯了 ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } //這里不會(huì)瘋狂自旋,這里會(huì)判斷鎖失效之后才會(huì)繼續(xù)進(jìn)行自旋,這樣可以節(jié)省一點(diǎn)CPU資源 // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); }

    Redison底層解鎖源碼分析:

    @Override public void unlock() { // 調(diào)用異步解鎖方法 Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); //當(dāng)釋放鎖的線程和已存在鎖的線程不是同一個(gè)線程,返回null if (opStatus == null) { throw new IllegalMonitorStateException(“attempt to unlock lock, not locked by current thread by node id: ” + id + ” thread-id: ” + Thread.currentThread().getId()); } //根據(jù)執(zhí)行l(wèi)ua腳本返回值判斷是否取消續(xù)命訂閱 if (opStatus) { // 取消續(xù)命訂閱 cancelExpirationRenewal(); } }

    protected RFuture unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, //如果鎖已經(jīng)不存在, 發(fā)布鎖釋放的消息,返回1 “if (redis.call(‘exists’, KEYS[1]) == 0) then ” + “redis.call(‘publish’, KEYS[2], ARGV[1]); ” + “return 1; ” + “end;” + //如果釋放鎖的線程和已存在鎖的線程不是同一個(gè)線程,返回null “if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0) then ” + “return nil;” + “end; ” + //當(dāng)前線程持有鎖,用hincrby命令將鎖的可重入次數(shù)-1,即線程id的value值-1 “local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1); ” + //若線程id的value值即可重入鎖的次數(shù)大于0 ,就更新過期時(shí)間,返回0 “if (counter > 0) then ” + “redis.call(‘pexpire’, KEYS[1], ARGV[2]); ” + “return 0; ” + //否則證明鎖已經(jīng)釋放,刪除key并發(fā)布鎖釋放的消息,返回1 “else ” + “redis.call(‘del’, KEYS[1]); ” + “redis.call(‘publish’, KEYS[2], ARGV[1]); ” + “return 1; “+ “end; ” + “return nil;”, Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); } // getName()傳入KEYS[1],表示傳入解鎖的keyName // getChannelName()傳入KEYS[2],表示redis內(nèi)部的消息訂閱channel // LockPubSub.unlockMessage傳入ARGV[1],表示向其他redis客戶端線程發(fā)送解鎖消息 // internalLockLeaseTime傳入ARGV[2],表示鎖的超時(shí)時(shí)間,默認(rèn)是30秒 // getLockName(threadId)傳入ARGV[3],表示鎖的唯一標(biāo)識(shí)線程id

    void cancelExpirationRenewal() { // 將該線程從定時(shí)任務(wù)中刪除 Timeout task = expirationRenewalMap.remove(getEntryName()); if (task != null) { task.cancel(); } }

    上述情況如果是單臺(tái)Redis,那么利用Redison開源框架實(shí)現(xiàn)Redis的分布式鎖已經(jīng)很完美了,但是往往生產(chǎn)環(huán)境的的Redis一般都是哨兵主從架構(gòu),Redis的主從架構(gòu)有別與Zookeeper的主從,客戶端只能請(qǐng)求Redis主從架構(gòu)的Master節(jié)點(diǎn),Slave節(jié)點(diǎn)只能做數(shù)據(jù)備份,Redis從Master同步數(shù)據(jù)到Slave并不需要同步完成后才能繼續(xù)接收新的請(qǐng)求,那么就會(huì)存在一個(gè)主從同步的問題。

    當(dāng)Redis的鎖設(shè)置成功,正在執(zhí)行業(yè)務(wù)代碼,當(dāng)Redis向從服務(wù)器同步時(shí),Redis的Maste節(jié)點(diǎn)宕機(jī)了,Redis剛剛設(shè)置成功的鎖還沒來得及同步到Slave節(jié)點(diǎn),那么此時(shí)Redis的主從哨兵模式就會(huì)重新選舉出新的Master節(jié)點(diǎn),那么這個(gè)新的Master節(jié)點(diǎn)其實(shí)就是原來的Slave節(jié)點(diǎn),此時(shí)后面請(qǐng)求進(jìn)來的線程都會(huì)請(qǐng)求這個(gè)新的Master節(jié)點(diǎn),然而選舉后產(chǎn)生的新Master節(jié)點(diǎn)實(shí)際上是沒有那把鎖的,那么從而導(dǎo)致了鎖的失效。

    上述問題用Redis主從哨兵架構(gòu)實(shí)現(xiàn)的分布式鎖在這種極端情況下是無法避免的,但是一般情況下生產(chǎn)上這種故障的概率極低,即使偶爾有問題也是可以接受的。

    如果想使分布式鎖變的百分百可靠,那可以選用Zookeeper作為分布式鎖,就能完美的解決這個(gè)問題。由于zk的主從數(shù)據(jù)同步有別與Redis主從同步,zk的強(qiáng)一致性使得當(dāng)客戶端請(qǐng)求zk的Leader節(jié)點(diǎn)加鎖時(shí),當(dāng)Leader將這個(gè)鎖同步到了zk集群的大部分節(jié)點(diǎn)時(shí),Leader節(jié)點(diǎn)才會(huì)返回客戶端加鎖成功,此時(shí)當(dāng)Leader節(jié)點(diǎn)宕機(jī)之后,zk內(nèi)部選舉產(chǎn)生新的Leader節(jié)點(diǎn),那么新的客戶款訪問新的Leader節(jié)點(diǎn)時(shí),這個(gè)鎖也會(huì)存在,所以zk集群能夠完美解決上述Redis集群的問題。

    由于Redis和Zookeeper的設(shè)計(jì)思路不一樣,任何分布式架構(gòu)都需要滿足CAP理論,“魚和熊掌不可兼得”,要么選擇AP要么選擇CP,很顯然Redis是AP結(jié)構(gòu),而zk是屬于CP架構(gòu),也導(dǎo)致了兩者的數(shù)據(jù)同步本質(zhì)上的區(qū)別。

    其實(shí)設(shè)計(jì)Redis分布式鎖有種RedLock的思想就是借鑒zk實(shí)現(xiàn)分布式鎖的這個(gè)特點(diǎn),這種Redis的加鎖方式在Redison框架中也有提供api,具體使用也很簡單,這里就不一一贅述了。其主要思想如下圖所示:

    這種實(shí)現(xiàn)方式,我認(rèn)為生產(chǎn)上并不推薦使用。很簡單原本只需要對(duì)一個(gè)Redis加鎖,設(shè)置成功返回即可,但是現(xiàn)在需要對(duì)多個(gè)Redis進(jìn)行加鎖,無形之中增加了好幾次網(wǎng)絡(luò)IO,萬一第一個(gè)Redis加鎖成功后,后面幾個(gè)Redis在加鎖過程中出現(xiàn)了類似網(wǎng)絡(luò)異常的這種情況,那第一個(gè)Redis的數(shù)據(jù)可能就需要做數(shù)據(jù)回滾操作了,那為了解決一個(gè)極低概率發(fā)生的問題又引入了多個(gè)可能產(chǎn)生的新問題,很顯然得不償失。并且這里還有可能出現(xiàn)更多亂七八糟的問題,所以我認(rèn)為這種Redis分布式鎖的實(shí)現(xiàn)方式極其不推薦生產(chǎn)使用。

    退一萬說如果真的需要這種強(qiáng)一致性的分布式鎖的話,那為什么不直接用zk實(shí)現(xiàn)的分布式鎖呢,性能肯定也比這個(gè)RedLock的性能要好。

    3. 分布式鎖使用場景

    這里著重講一下分布式鎖的兩種以下使用場景:

    3.1 熱點(diǎn)緩存key重建優(yōu)化

    一般情況下互聯(lián)網(wǎng)公司基本都是使用“緩存”加過期時(shí)間的策略,這樣不僅加快數(shù)據(jù)讀寫, 而且還能保證數(shù)據(jù)的定期更新,這種策略能夠滿足大部分需求,但是也會(huì)有一種特殊情況會(huì)有問題:原本就存在一個(gè)冷門的key,因?yàn)槟硞€(gè)熱點(diǎn)新聞的出現(xiàn),突然這個(gè)冷門的key請(qǐng)求量暴增成了使其稱為了一個(gè)熱點(diǎn)key,此時(shí)緩存失效,并且又無法在很短時(shí)間內(nèi)重新設(shè)置緩存,那么緩存失效的瞬間,就會(huì)有大量線程來訪問到后端,造成數(shù)據(jù)庫負(fù)載加大,從而可能會(huì)讓應(yīng)用崩潰。

    例如:“Air Force one”原本就是一個(gè)冷門的key存在于緩存中,微博突然有個(gè)明星穿著“Air Force one”上了熱搜,那么就會(huì)有很多明星的粉絲來得物app購買“Air Force one”,此時(shí)的“Air Force one”就直接成為了一個(gè)熱點(diǎn)key,那么此時(shí)“Air Force one”這個(gè)key如果緩存恰好失效了之后,就會(huì)有大量的請(qǐng)求同時(shí)訪問到db,會(huì)給后端造成很大的壓力,甚至?xí)屜到y(tǒng)宕機(jī)。

    要解決這個(gè)問題只需要用一個(gè)簡單的分布式鎖即可解決這個(gè)問題,只允許一個(gè)線程去重建緩存,其他線程等待重建緩存的線程執(zhí)行完, 重新從緩存獲取數(shù)據(jù)即可。可見下面的實(shí)例偽代碼:

    //分布式鎖解決熱點(diǎn)緩存,代碼如下:

    public String getCache(String key) { //從緩存獲取數(shù)據(jù) String value = stringRedisTemplate.opsForValue().get(key); //傳入Redis的key try { if (Objects.isNull(value)) { //這里只允許一個(gè)線程進(jìn)入,重新設(shè)置緩存 String mutexKey = key; //從db 獲取數(shù)據(jù) value = mysql.getDataFromMySQL(); //寫回緩存 stringRedisTemplate.opsForValue().setIfPresent(mutexKey, “poizon”, 60, TimeUnit.SECONDS); //刪除key stringRedisTemplate.delete(mutexKey); } else { Thread.sleep(100); getCache(key); } } catch (InterruptedException e) { log.error(“getCache is error”, e); } return value; }

    3.2 解決緩存與數(shù)據(jù)庫數(shù)據(jù)不一致問題

    如果業(yè)務(wù)對(duì)數(shù)據(jù)的緩存與數(shù)據(jù)庫需要強(qiáng)一致時(shí),且并發(fā)量不是很高的情況下的情況下時(shí),就可以直接加一個(gè)分布式讀寫鎖就可以直接解決這個(gè)問題了??梢灾苯永每梢约臃植际阶x寫鎖保證并發(fā)讀寫或?qū)憣懙臅r(shí)候按順序排好隊(duì),讀讀的時(shí)候相當(dāng)于無鎖。

    并發(fā)量不是很高且業(yè)務(wù)對(duì)緩存與數(shù)據(jù)庫有著強(qiáng)一致對(duì)要求時(shí),通過這種方式實(shí)現(xiàn)最簡單,且效果立竿見影。倘若在這種場景下,如果還監(jiān)聽binlog通過消息的方式延遲雙刪的方式去保證數(shù)據(jù)一致性的話,引入了新的中間件增加了系統(tǒng)的復(fù)雜度,得不償失。

    3.3超高并發(fā)場景下的分布式鎖設(shè)計(jì)理論

    與ConcurrentHashMap的設(shè)計(jì)思想有點(diǎn)類似,用分段鎖來實(shí)現(xiàn),這個(gè)是之前在網(wǎng)上看到的實(shí)現(xiàn)思路,本人并沒有實(shí)際使用過,不知道水深不深,但是可以學(xué)習(xí)一下實(shí)現(xiàn)思路。

    假如A商品的庫存是2000個(gè),現(xiàn)在可以將該A商品的2000個(gè)庫存利用類似ConcurrentHashMap的原理將不同數(shù)量段位的庫存的利用取?;蛘呤莌ash算法讓其擴(kuò)容到不同的節(jié)點(diǎn)上去,這樣這2000的庫存就水平擴(kuò)容到了多個(gè)Redis節(jié)點(diǎn)上,然后請(qǐng)求Redis拿庫存的時(shí)候請(qǐng)求原本只能從一個(gè)Redis上取數(shù)據(jù),現(xiàn)在可以從五個(gè)Redis上取數(shù)據(jù),從而可以大大提高并發(fā)效率。

    4. 總結(jié)與思考

    綜上可知,Redis分布式鎖并不是絕對(duì)安全,Redis分布式鎖在某種極端情況下是無法避免的,但是一般情況下生產(chǎn)上這種故障的概率極低,即使偶爾有問題也是可以接受。

    CAP 原則指的是在一個(gè)分布式系統(tǒng)中,一致性(Consistency)、可用性(Availability)、分區(qū)容錯(cuò)性(Partition tolerance)這三個(gè)要素最多只能同時(shí)實(shí)現(xiàn)兩點(diǎn),不可能三者兼顧。魚和熊掌不可兼得”,要么選擇AP要么選擇CP,選擇Redis作為分布式鎖的組件在于其單線程內(nèi)存操作效率很高,且在高并發(fā)場景下也可以保持很好的性能。

    如果一定要要求分布式鎖百分百可靠,那可以選用Zookeeper或者M(jìn)ySQL作為分布式鎖,就能完美的解決鎖安全的問題,但是選擇了一致性那就要失去可用性,所以Zookeeper或者M(jìn)ySQL實(shí)現(xiàn)的分布式鎖的性能遠(yuǎn)不如Redis實(shí)現(xiàn)的分布式鎖。

    感謝閱讀,更多的java課程學(xué)習(xí)路線,筆記,面試等架構(gòu)資料,點(diǎn)贊收藏+評(píng)論轉(zhuǎn)發(fā)+關(guān)注我之后私信我【資料】即可獲取免費(fèi)資料!

    鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請(qǐng)聯(lián)系管理員(admin#wlmqw.com)刪除。
    上一篇 2022年6月16日 12:04
    下一篇 2022年6月16日 12:04

    相關(guān)推薦

    聯(lián)系我們

    聯(lián)系郵箱:admin#wlmqw.com
    工作時(shí)間:周一至周五,10:30-18:30,節(jié)假日休息