一、商品上架流程
product
1、根據(jù)spuid查詢所有sku信息
2、將sku信息封裝成skuModel(向es中存儲的是skuModel VO)
不一致信息:
庫存信息
發(fā)送遠程調(diào)用 查詢系統(tǒng)中是否存在庫存
熱度評分
品牌和分類的名字
檢索屬性
查出當(dāng)前sku中所有可以被檢索的屬性
3、將數(shù)據(jù)發(fā)給es進行保存,調(diào)用gulimall-search微服務(wù)進行保存
(1)在es中建立索引,建立號映射關(guān)系(doc/json/product-mapping.json)
(2)在es中保存這些數(shù)據(jù)
(3)設(shè)置成功失敗狀態(tài)
二、Nginx反向代理配置
1、什么是nginx反向代理
正向代理:我們要訪問谷歌,但是被墻,所以將請求發(fā)送給代理服務(wù)器,由代理服務(wù)器來訪問,這叫正向代理
反向代理:服務(wù)器部署在內(nèi)網(wǎng)中,客戶端訪問服務(wù)器不能直接訪問服務(wù)器的內(nèi)網(wǎng)地址,通常需要根據(jù)域名來訪問(www.baidu.com)。所以需要反向代理服務(wù)器來將外網(wǎng)的ip轉(zhuǎn)換成內(nèi)網(wǎng)ip來訪問。
2、反向代理如何實現(xiàn)
只代理到商品服務(wù):客戶端發(fā)送gulimall.com請求,在windows中映射gulimall.com是虛擬機ip(192.168.56.10)所以會訪問虛擬機。虛擬機中的nginx監(jiān)聽80端口。在配置文件中,會設(shè)置nginx代理到的路徑http://192.168.56.1:10000。這樣就訪問到了商品服務(wù)里的首頁(首頁在商品服務(wù)中)
location/{proxy_pass http://192.168.56.1:10000}
代理到網(wǎng)關(guān): 客戶端發(fā)送gulimall.com請求,在windows中映射gulimall.com是虛擬機ip(192.168.56.10)所以會訪問虛擬機。虛擬機中的nginx監(jiān)聽80端口。在配置文件中nginx.config中配置上游服務(wù)器gulimall http://192.168.56.1:88。然后繼續(xù)在配置文件gulimall.config中nginx代理到的路徑http://gulimall。
需要注意的是,nginx代理給網(wǎng)關(guān)會丟失一部分信息。解決如下
三、壓力測試
1、JVM堆棧垃圾回收
2、優(yōu)化方式
加內(nèi)存(提升JVM堆中eden區(qū)和old區(qū)的內(nèi)存空間,避免頻繁的gc)
數(shù)據(jù)庫優(yōu)化,加索引
打開模板緩存,設(shè)置日志為error時記錄
靜態(tài)資源快速返回 動靜分離
優(yōu)化代碼 減少循環(huán)查表
3、Nginx實現(xiàn)動靜分離
(1)將所有的靜態(tài)資源移動到nginx中
(2)/static/**下的所有內(nèi)容由nginx返回,在nginx中進行相關(guān)配置,gulimall-conf中設(shè)置
location /static/{ root /usr/share/nginx/html }
4、優(yōu)化三級分類數(shù)據(jù)獲取
直接查詢所有分類的結(jié)果,只查一次表,將其他循環(huán)查表的過程抽取為一個函數(shù)
四、緩存與分布式鎖
為什么使用緩存
如何使用redis
引入data-redis-starter
org.springframework.boot spring-boot-starter-data-redis
簡單配置redis的host
使用自動配置好的String Redis Template redis
String Redis Template redis中 存儲鍵值對 鍵與值都是String類型
//將數(shù)據(jù)放入緩存@Overridepublic Map getCatelogJson() { String catelogJson = redisTemplate.opsForValue().get(“catelogJson”); if (StringUtils.isEmpty(catelogJson)){ //緩存中沒有 Map catelogJsonFromDB = getCatelogJsonFromDB(); redisTemplate.opsForValue().set(“catelogJson”,JSON.toJSONString(catelogJsonFromDB)); return catelogJsonFromDB; } //因為轉(zhuǎn)化的對象是復(fù)雜對象,所以通過TypeReference Map catelogJsonFromDB = JSON.parseObject(catelogJson,new TypeReference<Map>(){ }); return catelogJsonFromDB;}//從數(shù)據(jù)庫查詢數(shù)據(jù)public Map getCatelogJsonFromDB() { //將數(shù)據(jù)庫的多次交互,轉(zhuǎn)為一次,一次性查詢所有數(shù)據(jù) List allList = baseMapper.selectList(null); //查出所有分類 List level1Categorys = getParent_cid(allList,0L); //分裝數(shù)據(jù) Map resultMap = level1Categorys.stream().collect(Collectors.toMap(CategoryEntity::getCatId, v -> { //每一個的一級分類,查到這個一級分類的二級分類 List list = getParent_cid(allList,v.getCatId()); List catelog2VoList = null; if (!StringUtils.isEmpty(list)) { catelog2VoList = list.stream().map(item -> { Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, item.getCatId().toString(), item.getName()); //封裝二級分類的三級分類 List entityList = getParent_cid(allList,item.getCatId()); if (!StringUtils.isEmpty(entityList)){ List catelog3Vos = entityList.stream().map(m -> { Catelog2Vo.Catelog3Vo catelog3Vo = new Catelog2Vo.Catelog3Vo(item.getCatId().toString(),m.getCatId().toString(),m.getName()); return catelog3Vo; }).collect(Collectors.toList()); catelog2Vo.setCatalog3List(catelog3Vos); } return catelog2Vo; }).collect(Collectors.toList()); return catelog2VoList; } return catelog2VoList; })); return resultMap;}private List getParent_cid(List allList,Long parent_cid) { List collect = allList.stream().filter(item -> { return item.getParentCid().equals(parent_cid); }).collect(Collectors.toList()); return collect; // return baseMapper.selectList(new QueryWrapper().eq(“parent_cid”, v.getCatId()));}
內(nèi)存溢出以及解決
TODO 產(chǎn)生堆外內(nèi)存溢出OutOfDirectMemoryError:
產(chǎn)生原因
- springboot2.0以后默認使用lettuce作為操作redis的客戶端,它使用netty進行網(wǎng)絡(luò)通信
- lettuce的bug導(dǎo)致netty堆外內(nèi)存溢出。netty如果沒有指定堆外內(nèi)存,默認使用Xms的值,可以使用-Dio.netty.maxDirectMemory進行設(shè)置
解決方案:由于是lettuce的bug造成,不要直接使用-Dio.netty.maxDirectMemory 去調(diào)大虛擬機堆外內(nèi)存,治標(biāo)不治本。
- 升級lettuce客戶端。
- 切換使用jedis
lettuce和jedis是操作redis的底層客戶端,RedisTemplate是再次封裝
緩存失效 穿透、雪崩、擊穿
**緩存穿透:**是指大量的并發(fā)請求訪問一個數(shù)據(jù)庫中從未出現(xiàn)的內(nèi)容,由于緩存中沒有這條內(nèi)容,所以會并發(fā)查表導(dǎo)致數(shù)據(jù)庫崩潰。
**解決辦法:**將null也寫入緩存中
**緩存雪崩:**是指設(shè)置緩存時,key使用了相同的過期時間,大量并發(fā)此時同時訪問這些過期緩存,導(dǎo)致同時查表導(dǎo)致數(shù)據(jù)庫崩潰。
**解決辦法:**緩存過期時間在原來的基礎(chǔ)上設(shè)置隨機值。
**緩存擊穿:**是指大量并發(fā)請求同時訪問一個熱點key,如果這個熱點key剛好在緩存中過期,導(dǎo)致這個key的數(shù)據(jù)查詢都落到數(shù)據(jù)庫導(dǎo)致數(shù)據(jù)庫崩潰。
**解決辦法:**大量的并發(fā)只讓一個人去查,其他人等待,查到之后釋放鎖,其他人回去到鎖,查找緩存不會查找數(shù)據(jù)庫。加鎖方法使用synchronized(this),this表示當(dāng)前對象。
注意:緩存時許問題,確認緩存、查數(shù)據(jù)庫和結(jié)果放入緩存是一個原子操作,要在鎖內(nèi)實現(xiàn)。
@Overridepublic Map getCatelogJson() { String catelogJson = redisTemplate.opsForValue().get(“catelogJson”); //緩存中沒有 if (StringUtils.isEmpty(catelogJson)) { //獲取數(shù)據(jù)返回 Map catelogJsonFromDB = getCatelogJsonFromDB(); return catelogJsonFromDB; } //因為轉(zhuǎn)化的對象是復(fù)雜對象,所以通過TypeReference Map catelogJsonFromDB = JSON.parseObject(catelogJson, new TypeReference<Map>() { }); return catelogJsonFromDB;}//從數(shù)據(jù)庫查詢數(shù)據(jù)public Map getCatelogJsonFromDB() { synchronized (this) { //判斷緩存是否已經(jīng)有數(shù)據(jù),防止之前的線程已經(jīng)放好數(shù)據(jù) String catelogJson = redisTemplate.opsForValue().get(“catelogJsonFromDB”); if (!StringUtils.isEmpty(catelogJson)) { //因為轉(zhuǎn)化的對象是復(fù)雜對象,所以通過TypeReference Map resultMap = JSON.parseObject(catelogJson, new TypeReference<Map>() { }); return resultMap; } //將數(shù)據(jù)庫的多次交互,轉(zhuǎn)為一次,一次性查詢所有數(shù)據(jù) List allList = baseMapper.selectList(null); //查出所有分類 List level1Categorys = getParent_cid(allList, 0L); //分裝數(shù)據(jù) Map resultMap = level1Categorys.stream().collect(Collectors.toMap(CategoryEntity::getCatId, v -> { //每一個的一級分類,查到這個一級分類的二級分類 List list = getParent_cid(allList, v.getCatId()); List catelog2VoList = null; if (!StringUtils.isEmpty(list)) { catelog2VoList = list.stream().map(item -> { Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, item.getCatId().toString(), item.getName()); //封裝二級分類的三級分類 List entityList = getParent_cid(allList, item.getCatId()); if (!StringUtils.isEmpty(entityList)) { List catelog3Vos = entityList.stream().map(m -> { Catelog2Vo.Catelog3Vo catelog3Vo = new Catelog2Vo.Catelog3Vo(item.getCatId().toString(), m.getCatId().toString(), m.getName()); return catelog3Vo; }).collect(Collectors.toList()); catelog2Vo.setCatalog3List(catelog3Vos); } return catelog2Vo; }).collect(Collectors.toList()); return catelog2VoList; } return catelog2VoList; })); //放入緩存 redisTemplate.opsForValue().set(“catelogJson”, JSON.toJSONString(resultMap),1L, TimeUnit.DAYS); return resultMap; }}private List getParent_cid(List allList, Long parent_cid) { List collect = allList.stream().filter(item -> { return item.getParentCid().equals(parent_cid); }).collect(Collectors.toList()); return collect; // return baseMapper.selectList(new QueryWrapper().eq(“parent_cid”, v.getCatId()));}
上面的本地鎖只能鎖住當(dāng)前進程,在分布式的情況下無法保證鎖住整個集群服務(wù)
分布式鎖
可以讓商品服務(wù)來占坑,當(dāng)一個商品服務(wù)拿到鎖后,其他商品服務(wù)必須等待
設(shè)置分布式鎖需要考慮的問題
一二階段為設(shè)置鎖需要考慮的問題,二三階段為刪除鎖需要考慮的問題
- 第一階段
- setnx會獲取鎖,然后執(zhí)行業(yè)務(wù),最后刪除鎖
- 出現(xiàn)問題:當(dāng)執(zhí)行業(yè)務(wù)時發(fā)生宕機,無法執(zhí)行刪除鎖的操作,其他線程無法拿到鎖。造成死鎖。
- 解決問題:將鎖設(shè)置一個自動過期時間,來保證鎖肯定會被釋放
方法名:getCateLogJsonFromDbWithRedisLock//占分布式鎖Boolean lock=redisTemplate.opsForValue().setIfAbsent(“lock”,”111″);if(lock){ //如果獲得鎖,執(zhí)行業(yè)務(wù)Map dataFromDb=getDataFromDb();//執(zhí)行完刪除鎖redisTemplate.delete(“lock”)return dataFromDb;}else{ return getCateLogJsonFromDbWithRedisLock();}
改進后為:
方法名:getCateLogJsonFromDbWithRedisLock//占分布式鎖Boolean lock=redisTemplate.opsForValue().setIfAbsent(“lock”,”111″);if(lock){ //如果獲得鎖,執(zhí)行業(yè)務(wù)Map dataFromDb=getDataFromDb();//設(shè)置過期時間redisTemplate.expire(“key”,30,TimeUnit.SECONDS)//執(zhí)行完刪除鎖redisTemplate.delete(“lock”)return dataFromDb;}else{ return getCateLogJsonFromDbWithRedisLock();}
- 第二階段
- 出現(xiàn)問題:雖然設(shè)置了過期時間,但是如果在設(shè)置過期時間之前發(fā)生了宕機,過期時間沒有設(shè)置上,又會導(dǎo)致死鎖問題。
- 解決問題:設(shè)置過期時間和占位必須是原子操作
//占分布式鎖//setIfAbsent方法的第三個參數(shù)是設(shè)置過期時間Boolean lock=redisTemplate.opsForValue().setIfAbsent(“lock”,”111″,300);if(lock){ //如果獲得鎖,執(zhí)行業(yè)務(wù)Map dataFromDb=getDataFromDb();//設(shè)置過期時間//redisTemplate.expire(“key”,30,TimeUnit.SECONDS)//執(zhí)行完刪除鎖redisTemplate.delete(“lock”)return dataFromDb;}else{ return getCateLogJsonFromDbWithRedisLock();}
- 第三階段出現(xiàn)問題:由于設(shè)置了自動過期時間,如果當(dāng)前執(zhí)行的業(yè)務(wù)很長,刪除鎖的時候,鎖已經(jīng)自動過期了,此時刪除的可能是別人的鎖。解決問題:給每個鎖添加唯一的uuid,刪除的時候判斷一下,匹配的是自己的鎖才能刪除
- 第四階段出現(xiàn)問題:判斷完成之后,需要刪除鎖前,鎖剛好過期,別人設(shè)置新的值,那么我們會刪除別人的鎖解決問題:刪除鎖必須保證原子操作(判斷與刪除同時進行)采用redis+lua腳本實現(xiàn)
String uuid = UUID.randomUUID().toString(); //設(shè)置redis分布式鎖,30s自動刪除鎖 Boolean isLock = redisTemplate.opsForValue().setIfAbsent(“lock”, uuid,300L,TimeUnit.SECONDS); if (isLock){ //搶鎖成功。。。執(zhí)行業(yè)務(wù) Map resultMap = null; try { resultMap = getLongListMap(); }finally { //lua腳本解鎖:讓獲取數(shù)據(jù)+對比數(shù)據(jù)成為原子操作 String script = “if redis.call(“get”,KEYS[1]) == ARGV[1] then” + ” return redis.call(“del”,KEYS[1])” + “else” + ” return 0″ + “end”; Long lock = redisTemplate.execute(new DefaultRedisScript(script, Long.class),Arrays.asList(“lock”),uuid); } return resultMap; }else { //搶鎖失敗。。。重試 try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } // 睡眠0.2s后,重新調(diào)用 //自旋 return getCatelogJsonFromDBWithRedisLock(); }
Redisson
Redisson使用
- pom導(dǎo)入
org.redisson redisson 3.13.4
- 開啟配置,寫一個配置類
@Configurationpublic class RedissonConfig { @Bean(destroyMethod = “shutdown”) public RedissonClient redissonClient(){ Config config = new Config(); config.setTransportMode(TransportMode.EPOLL); config.useClusterServers() //可以用”rediss://”來啟用SSL連接 .addNodeAddress(“redis://192.168.109.101:6379”); return Redisson.create(config); } }
可重入鎖
基于Redis的Redisson分布式可重入鎖RLock Java對象實現(xiàn)了java.util.concurrent.locks.Lock接口。同時還提供了異步(Async)、反射式(Reactive)和RxJava2標(biāo)準(zhǔn)的接口。
- 什么是可重入鎖:A中調(diào)用B,A拿到鎖后,該鎖可以由B直接使用,這個鎖叫可重入鎖,不可重入鎖是 A拿到鎖后,A調(diào)用B,B需要等待A鎖的釋放,而A鎖需要等待B執(zhí)行完才釋放,造成死鎖。
- 如何使用Redisson分布式可重入鎖
@RequestBody@GetMapping(/”hello”)public String hello(){ //1、獲取鎖 鎖名字一樣就是同一把鎖RLock lock=redission.getLock(“lock”);//指定解鎖時間的鎖//2、枷鎖lock.lock();//阻塞式等待//指定解鎖時間的鎖//lock.lock(10,TimeUnit.SECONDS)try{ //3、執(zhí)行業(yè)務(wù)System.out.println(“執(zhí)行業(yè)務(wù)”);Thread.sleep(30000);}catch(Exception e){ }finally{ //解鎖lock.unlock();}}
Question1:這與上節(jié)redis中自己設(shè)置的鎖有什么區(qū)別
這里是對自己設(shè)置的鎖進行了封裝:上節(jié)我們需要自己設(shè)置加鎖和解鎖時的原子操作。而Redisson直接封裝了這些原子操作。由于這里加鎖會自動設(shè)置過期時間,所以不會出現(xiàn)業(yè)務(wù)崩潰,鎖釋放不了導(dǎo)致死鎖問題。
Question2:Redisson可重入鎖,如何保證業(yè)務(wù)時間長而導(dǎo)致鎖被自動刪除。
Question3:看門狗如何進行鎖的續(xù)期
只要占鎖成功,就會啟動一個定時任務(wù),重新給鎖設(shè)置過期時間,新的過期時間就是看門狗的默認時間。每隔(一個看門狗時間/3)=10s,會自動再次續(xù)期,續(xù)到30s
Question4:如果指定自動解鎖時間會發(fā)生什么
如果指定自動過期時間,看門狗會失效,不會再有鎖的續(xù)期。自動解鎖時間一定要大于業(yè)務(wù)執(zhí)行時間。(如果小于會出現(xiàn)業(yè)務(wù)沒執(zhí)行完,鎖已經(jīng)被釋放,此時別的進程搶占鎖,當(dāng)前進程刪除鎖是報錯)。
分布式可重入讀寫鎖允許同時有多個讀鎖和一個寫鎖處于加鎖狀態(tài)。
讀鎖被稱為共享鎖,寫鎖被稱為互斥鎖
讀讀共享 讀寫/寫寫 都是互斥
RReadWriteLock rwlock = redisson.getReadWriteLock(“anyRWLock”);// 最常見的使用方法rwlock.readLock().lock();// 或rwlock.writeLock().lock();// 10秒鐘以后自動解鎖// 無需調(diào)用unlock方法手動解鎖rwlock.readLock().lock(10, TimeUnit.SECONDS);// 或rwlock.writeLock().lock(10, TimeUnit.SECONDS);// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);// 或boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);//…lock.unlock();
生產(chǎn)者消費者問題中出現(xiàn)。例如停車與離開
信號量為存儲在redis中的一個數(shù)字,當(dāng)這個數(shù)字大于0時,即可以調(diào)用acquire()方法增加數(shù)量,也可以調(diào)用release()方法減少數(shù)量,但是當(dāng)調(diào)用release()之后小于0的話方法就會阻塞,直到數(shù)字大于0
tryAcquire()返回一個布爾型量,表示當(dāng)前是否可以獲取數(shù)量,如果數(shù)量為0,返回false。但是不會阻塞。常用于分布式限流
@GetMapping(“/park”)@ResponseBodypublic String park() { RSemaphore park = redissonClient.getSemaphore(“park”); try { park.acquire(2); } catch (InterruptedException e) { e.printStackTrace(); } return “停進2”;}@GetMapping(“/go”)@ResponseBodypublic String go() { RSemaphore park = redissonClient.getSemaphore(“park”); park.release(2); return “開走2”;}
緩存與數(shù)據(jù)一致性
在對表進行修改時,緩存中的內(nèi)容也需要修改,可以采用兩種模式來進行修改
- 雙寫模式:在寫完數(shù)據(jù)庫之后,再寫緩存。
- 存在問題:會出現(xiàn)臟數(shù)據(jù)的問題
- 失效模式:再寫完數(shù)據(jù)庫,刪除緩存中的數(shù)據(jù),下次再讀數(shù)據(jù)庫
- 存在問題:也會產(chǎn)生臟數(shù)據(jù)
- 解決臟數(shù)據(jù)問題:
- 設(shè)置過期時間,數(shù)據(jù)過期之后下一次重新查找數(shù)據(jù)庫
- 讀寫數(shù)據(jù)時,加上分布式鎖的讀寫鎖,經(jīng)常寫經(jīng)常讀會對性能有影響
如果想要保證強一致性可以使用Canal
Canal相當(dāng)于一個數(shù)據(jù)庫的從庫,業(yè)務(wù)更新數(shù)據(jù)庫后,它會及時的修改緩存。
注意:實時更新的數(shù)據(jù)、一致性要求高的數(shù)據(jù)本就不應(yīng)該放到緩存中,緩存中加上過期時間保證每天拿到的數(shù)據(jù)是當(dāng)前最新數(shù)據(jù)就可。遇到實時更新的數(shù)據(jù)、一致性要求高的數(shù)據(jù)就應(yīng)該查找數(shù)據(jù)庫。