缓存
什么是缓存
在需要频繁的读取数据时,受限于数据库的访问效率,系统整体性能偏低。
为了改善上述现象,我们在应用程序与数据库之间建立一种临时的数据存储机制,该区域中的数据保存在内存中,读写速度较快,可以有效解决数据库访问效率低下的问题。
这一块临时存储数据的区域就是缓存。
在使用缓存后,应用程序与缓存打交道,缓存与数据库打交道。
内置的缓存
SpringBoot内置了缓存。
用法
第一步:导入缓存技术对应的Starter。
注意名字"Spring cache abstraction", “abstraction” ,抽象的。在下文我们就能理解为什么是抽象的。 其POM坐标如下:
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-cache</artifactId > </dependency >
第二步:启用缓存,在引导类上方标注注解@EnableCaching
配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.kakawanyifan;import org.mybatis.spring.annotation.MapperScan;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cache.annotation.EnableCaching;@SpringBootApplication @MapperScan ("com.kakawanyifan.dao" )@EnableCaching public class Application { public static void main (String[] args) { SpringApplication.run(Application.class , args ) ; } }
第三步:设置操作的数据使用缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.kakawanyifan.service.impl;import com.kakawanyifan.dao.BookDao;import com.kakawanyifan.pojo.Book;import com.kakawanyifan.service.BookService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.cache.annotation.Cacheable;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.util.List;@Transactional @Service public class BookServiceImpl implements BookService { @Autowired private BookDao bookDao; 【部分代码略】 @Cacheable (value="cacheSpace" ,key="#id" ) public Book getById (Integer id) { return bookDao.getById(id); } 【部分代码略】 }
解释说明: 我们在业务方法getById
上使用@Cacheable
注解声明当前方法的返回值放入缓存中。 我们还需要指定缓存的存储位置,以及缓存中保存当前方法返回值对应的名称。
value
:描述缓存的存储位置。key
:描述缓存中保存数据的名称,使用#id
读取形参中的id值作为缓存名称。使用@Cacheable
注解后,执行当前操作,如果发现对应名称在缓存中没有数据,就正常读取数据,然后放入缓存;如果对应名称在缓存中有数据,就终止当前业务方法执行,直接返回缓存中的数据。
我们可以配置MyBatis打印内容到控制台,然后多请求几次看看,会发现第二次就没有打印SQL日志等,即使用了缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 【部分运行结果略】 2023-01-05 21:27:53.014 INFO 2100 --- [nio-8080-exec-1] c.k.controller.BookController : getById 2023-01-05 21:27:53.050 INFO 2100 --- [nio-8080-exec-1] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} inited Creating a new SqlSession Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6420dfc8] JDBC Connection [com.mysql.cj.jdbc.ConnectionImpl@496b9b67] will be managed by Spring ==> Preparing: select * from book where id = ? ==> Parameters: 10(Integer) <== Columns: id, type, name, description <== Row: 10, 市场营销, 直播就这么做:主播高效沟通实战指南, 李子柒、李佳奇、薇娅成长为网红的秘密都在书中 <== Total: 1 Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6420dfc8] Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6420dfc8] Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6420dfc8] Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6420dfc8] 2023-01-05 21:27:59.386 INFO 2100 --- [nio-8080-exec-3] c.k.controller.BookController : getById 2023-01-05 21:28:16.200 INFO 2100 --- [nio-8080-exec-4] c.k.controller.BookController : getById 【部分运行结果略】
案例(手机验证码)
我们再举一个手机验证码相关的例子,需求如下:
因此,我们设计两个表现层接口:一个用来模拟发送短信的过程,根据用户提供的手机号生成一个验证码,然后放入缓存;另一个用来模拟验证码校验的过程,使用传入的手机号和验证码进行匹配,并返回最终匹配结果。
验证码实体类,封装手机号与验证码两个属性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package com.kakawanyifan.pojo;import lombok.Data;import java.io.Serializable;@Data public class SMSCode implements Serializable { private String tel; private String code; public SMSCode (String tel, String code) { this .tel = tel; this .code = code; } }
验证码功能的业务层接口与实现类。
1 2 3 4 5 6 7 8 package com.kakawanyifan.service;import com.kakawanyifan.pojo.SMSCode;public interface SMSCodeService { public SMSCode sendCodeToSMS (String tel) ; public boolean checkCode (SMSCode smsCode) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.kakawanyifan.service.impl;import com.kakawanyifan.pojo.SMSCode;import com.kakawanyifan.service.SMSCodeService;import com.kakawanyifan.util.CodeUtil;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Service public class SMSCodeServiceImpl implements SMSCodeService { @Autowired private CodeUtil codeUtil; @Override public SMSCode sendCodeToSMS (String tel) { return codeUtil.generator(tel); } @Override public boolean checkCode (SMSCode smsCode) { String code = smsCode.getCode(); String cacheCode = codeUtil.get(smsCode.getTel()).getCode(); return code.equals(cacheCode); } }
验证码工具类,验证码的生成策略以及根据手机号读取验证码的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.kakawanyifan.util;import com.kakawanyifan.pojo.SMSCode;import org.springframework.cache.annotation.CachePut;import org.springframework.cache.annotation.Cacheable;import org.springframework.stereotype.Component;import java.text.DecimalFormat;@Component public class CodeUtil { @CachePut (value = "smsCode" , key = "#tel" ) public SMSCode generator (String tel) { int num = (int ) (Math.random() * 1000000 ); DecimalFormat decimalFormat = new DecimalFormat("000000" ); return new SMSCode(tel,decimalFormat.format(num)); } @Cacheable (value = "smsCode" ,key="#tel" ) public SMSCode get (String tel) { return null ; } }
解释说明:在获取验证码的功能上不能使用@Cacheable
注解,@Cacheable
注解是缓存中没有值则放入值,缓存中有值则取值,此处的功能仅仅是生成验证码并放入缓存,并不具有从缓存中取值的功能,因此不能使用@Cacheable
注解,应该使用仅具有向缓存中保存数据的功能,使用@CachePut
注解。
验证码功能的表现层接口,一个方法用于获取验证码,一个方法用于进行校验。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.kakawanyifan.controller;import com.kakawanyifan.pojo.SMSCode;import com.kakawanyifan.service.SMSCodeService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController @RequestMapping ("/sms" )@Slf 4jpublic class SMSCodeController { @Autowired private SMSCodeService smsCodeService; @GetMapping public SMSCode getCode (String tel) { return smsCodeService.sendCodeToSMS(tel); } @PostMapping public boolean checkCode (SMSCode smsCode) { return smsCodeService.checkCode(smsCode); } }
我们可以按照如下的方式请求,进行测试。
获取验证码:
1 curl --location --request GET 'http://localhost:8080/sms?tel=123'
校验验证码:
1 2 3 4 curl --location --request POST 'http://localhost:8080/sms' \ --header 'Content-Type: application/x-www-form-urlencoded' \ --data-urlencode 'tel=123' \ --data-urlencode 'code=846140'
整合Ehcache
Ehcache是一种缓存技术,使用SpringBoot整合Ehcache其实就是变更一下缓存技术的实现方式。
导入Ehcache的坐标
1 2 3 4 <dependency > <groupId > net.sf.ehcache</groupId > <artifactId > ehcache</artifactId > </dependency >
为什么不是导入Ehcache的starter,而是导入技术坐标呢? 其实SpringBoot整合缓存技术做的是通用格式,不管你整合哪种缓存技术,只是实现变化了,操作方式一样。 这也是为什么我们在上文导入缓存相关的启动器的时候,发现其名字带有"abstraction"。
配置缓存技术实现使用Ehcache
1 2 3 4 5 spring: cache: type: ehcache ehcache: config: ehcache.xml
配置缓存的类型type
为ehcache
。 由于ehcache
的配置有独立的配置文件格式,因此还需要指定ehcache
的配置文件,以便于读取相应配置。
ehcache.xml
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 <?xml version="1.0" encoding="UTF-8"?> <ehcache xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation ="http://ehcache.org/ehcache.xsd" updateCheck ="false" > <diskStore path ="ehcache" /> <defaultCache eternal ="false" diskPersistent ="true" maxElementsInMemory ="1000" overflowToDisk ="false" timeToIdleSeconds ="300" timeToLiveSeconds ="300" memoryStoreEvictionPolicy ="LRU" /> <cache name ="smsCode" eternal ="false" diskPersistent ="true" maxElementsInMemory ="1000" overflowToDisk ="false" timeToIdleSeconds ="300" timeToLiveSeconds ="300" memoryStoreEvictionPolicy ="LRU" /> </ehcache >
解释说明:在上文的案例中,我们设置了缓存保存的位置是smsCode
,在这里需要ehcache
中有一个缓存空间名称叫做smsCode
。
至此,SpringBoot整合Ehcache就已经完成了。 我们可以看到,原始代码没有任何修改,仅仅是加了一组配置:
导入Ehcache的坐标。 修改设置,配置缓存供应商为ehcache,并提供对应的缓存配置文件。
整合Redis
与上文使用Ehcache作为缓存的过程类似,加坐标,改缓存实现类型为ehcache。
使用redis做缓存,也是加坐标,改缓存实现类型为redis;差别之处只有一点,redis的配置可以在yml文件中直接进行配置,无需独立的配置文件。
导入redis
的坐标
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > </dependency >
配置缓存技术实现使用redis
1 2 3 4 5 6 7 spring: redis: host: 10.211 .55 .14 port: 6379 password: Redis@2023 cache: type: Redis
如果需要对redis
作为缓存进行配置,注意不是对原始的redis
进行配置,而是配置redis
作为缓存使用相关的配置,隶属于spring.cache.redis
节点下。
1 2 3 4 5 6 7 8 9 10 11 12 spring: redis: host: 10.211 .55 .14 port: 6379 password: Redis@2023 cache: type: Redis redis: use-key-prefix: false key-prefix: sms_ cache-null-values: false time-to-live: 300s
整合JetCache
JetCache是阿里推出的一套缓存方案,实现了多级缓存、缓存统计、自动刷新、异步调用、数据报表等功能。
对于其多级缓存,有本地缓存和远程缓存两个层级:
只用远程缓存(Redis)
jetcache对应的starter,当前坐标默认使用的远程方案是redis
1 2 3 4 5 6 <dependency > <groupId > com.alicp.jetcache</groupId > <artifactId > jetcache-starter-redis</artifactId > <version > 2.6.7</version > </dependency >
上文的spring-boot-starter-cache
,在本文暂时不需要。
注意JetCache的版本,如果版本过高,可能会有如下的报错:
1 2 3 4 5 6 7 8 9 10 11 12 13 Caused by: java.lang.NoClassDefFoundError: redis/clients/jedis/UnifiedJedis at com.alicp.jetcache.autoconfigure.RedisAutoConfiguration$RedisAutoInit.initCache(RedisAutoConfiguration.java:101) ~[jetcache-autoconfigure-2.7.3.jar:na] at com.alicp.jetcache.autoconfigure.AbstractCacheAutoInit.process(AbstractCacheAutoInit.java:69) ~[jetcache-autoconfigure-2.7.3.jar:na] at com.alicp.jetcache.autoconfigure.AbstractCacheAutoInit.afterPropertiesSet(AbstractCacheAutoInit.java:50) ~[jetcache-autoconfigure-2.7.3.jar:na] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863) ~[spring-beans-5.3.24.jar:5.3.24] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800) ~[spring-beans-5.3.24.jar:5.3.24] ... 68 common frames omitted Caused by: java.lang.ClassNotFoundException: redis.clients.jedis.UnifiedJedis at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[na:1.8.0_333] at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_333] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[na:1.8.0_333] at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_333] ... 73 common frames omitted
远程(Redis)方案基本配置
1 2 3 4 5 6 7 8 9 jetcache: remote: default: type: redis host: 10.211 .55 .14 port: 6379 password: Redis@2023 poolConfig: maxTotal: 50
jetcache:
与spring:
位于同一层级。poolConfig
必须配置,否则会报错。
启用缓存,在引导类上方使用注解@EnableCreateCacheAnnotation
开启缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.kakawanyifan;import com.alicp.jetcache.anno.config.EnableCreateCacheAnnotation;import org.mybatis.spring.annotation.MapperScan;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication @MapperScan ("com.kakawanyifan.dao" )@EnableCreateCacheAnnotation public class Application { public static void main (String[] args) { SpringApplication.run(Application.class , args ) ; } }
创建缓存对象Cache,并使用注解@CreateCache
标记当前缓存的信息,然后使用Cache对象的API操作缓存,put写缓存,get读缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.kakawanyifan.util;import com.alicp.jetcache.Cache;import com.alicp.jetcache.anno.CreateCache;import com.kakawanyifan.pojo.SMSCode;import org.springframework.stereotype.Component;import java.text.DecimalFormat;import java.util.concurrent.TimeUnit;@Component public class CodeUtil { @CreateCache (name = "jetCache_" ,expire = 300 ,timeUnit = TimeUnit.SECONDS) private Cache<String, SMSCode> jetCache; public SMSCode generator (String tel) { int num = (int ) (Math.random() * 1000000 ); DecimalFormat decimalFormat = new DecimalFormat("000000" ); String code = decimalFormat.format(num); SMSCode smsCode = new SMSCode(tel,code); jetCache.put(tel,smsCode); return smsCode; } public SMSCode get (String tel) { return jetCache.get(tel); } }
在上文,我们使用的是配置中定义的"default"缓存,"default"只是一个名字,例如我们可以再添加一种缓存,参照如下配置进行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 jetcache: remote: default: type: redis host: 10.211 .55 .14 port: 6379 password: Redis@2023 poolConfig: maxTotal: 50 sms: type: redis host: 10.211 .55 .14 port: 6379 password: Redis@2023 poolConfig: maxTotal: 50
如果想使用名称是sms的缓存,只需要在创建缓存时指定参数area,声明使用对应缓存。
1 2 @CreateCache (area="sms" ,name="jetCache_" ,expire = 10 ,timeUnit = TimeUnit.SECONDS)private Cache<String ,String> smsJetCache;
只用本地缓存(LinkedHashMap)
第一步同样是导包,在这里我们同样是导入jetcache-starter-redis
。
在远程方案中,配置中使用remote
表示远程,换成local
就是本地,只不过类型不一样而已。
1 2 3 4 5 jetcache: local: default: type: linkedhashmap keyConvertor: fastjson
解释说明:为了加速数据获取时key的匹配速度,jetcache要求指定key的类型转换器。简单说就是,如果你给了一个Object作为key的话,要先用key的类型转换器给转换成字符串,然后再保存。等到获取数据时,先使用给定的Object转换成字符串,然后根据字符串匹配。由于jetcache是阿里的技术,这里推荐key的类型转换器使用阿里的fastjson。
@EnableCreateCacheAnnotation
,启用缓存
创建缓存对象Cache时,标注当前使用本地缓存。
cacheType
控制当前缓存使用本地缓存还是远程缓存:
cacheType=CacheType.LOCAL
,使用本地缓存。cacheType
如果不进行配置,默认值是REMOTE
,使用远程缓存。1 @CreateCache (name = "jetCache_" ,expire = 300 ,timeUnit = TimeUnit.SECONDS,cacheType = CacheType.LOCAL)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.kakawanyifan.util;import com.alicp.jetcache.Cache;import com.alicp.jetcache.anno.CacheType;import com.alicp.jetcache.anno.CreateCache;import com.kakawanyifan.pojo.SMSCode;import org.springframework.stereotype.Component;import java.text.DecimalFormat;import java.util.concurrent.TimeUnit;@Component public class CodeUtil { @CreateCache (name = "jetCache_" ,expire = 300 ,timeUnit = TimeUnit.SECONDS,cacheType = CacheType.LOCAL) private Cache<String, SMSCode> jetCache; public SMSCode generator (String tel) { int num = (int ) (Math.random() * 1000000 ); DecimalFormat decimalFormat = new DecimalFormat("000000" ); String code = decimalFormat.format(num); SMSCode smsCode = new SMSCode(tel,code); jetCache.put(tel,smsCode); return smsCode; } public SMSCode get (String tel) { return jetCache.get(tel); } }
使用本地缓存和远程缓存
本地和远程方法都有了,将两种配置合并到一起,就是本地+远程的方案。
1 2 3 4 5 6 7 8 9 10 11 12 13 jetcache: local: default: type: linkedhashmap keyConvertor: fastjson remote: default: type: redis host: 10.211 .55 .14 port: 6379 password: Redis@2023 poolConfig: maxTotal: 50
在创建缓存的时候,配置cacheType为BOTH即则本地缓存与远程缓存同时使用。
1 2 @CreateCache (name = "jetCache_" ,expire = 300 ,timeUnit = TimeUnit.SECONDS,cacheType = CacheType.BOTH)private Cache<String, SMSCode> jetCache;
方法缓存
在上文讨论Spring中的缓存的代码中,我们只需要在相关的方法上添加缓存,这种被称为方法缓存。在JetCache中也有方法缓存。
第一步同样是导包,在这里我们同样是导入jetcache-starter-redis
。
配置缓存
本地和远程方法都有了,将两种配置合并到一起,就是本地+远程的方案。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 jetcache: local: default: type: linkedhashmap keyConvertor: fastjson remote: default: type: redis host: 10.211 .55 .14 port: 6379 password: Redis@2023 keyConvertor: fastjson valueEncode: java valueDecode: java poolConfig: maxTotal: 50
解释说明: 由于Redis缓存中不支持保存对象,因此需要对Redis设置当Object类型数据进入到Redis中时如何进行类型转换。
配置keyConvertor表示key的类型转换方式 配置value的转换类型方式,值进入redis时是java类型,标注valueEncode为java,值从redis中读取时转换成java,标注valueDecode为java。 为了实现Object类型的值进出Redis,需要保障进出Redis的Object类型的数据必须实现序列化接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 package com.kakawanyifan.pojo;import java.io.Serializable;public class Book implements Serializable { private Integer id; private String type; private String name; private String description; 【部分代码略】 }
启用缓存,开启方法缓存功能,并配置basePackages,说明在哪些包中开启方法缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package com.kakawanyifan;import com.alicp.jetcache.anno.config.EnableCreateCacheAnnotation;import com.alicp.jetcache.anno.config.EnableMethodCache;import org.mybatis.spring.annotation.MapperScan;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication @MapperScan ("com.kakawanyifan.dao" )@EnableCreateCacheAnnotation @EnableMethodCache (basePackages = "com.kakawanyifan" )public class Application { public static void main (String[] args) { SpringApplication.run(Application.class , args ) ; } }
使用注解@Cached
标注当前方法使用缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.kakawanyifan.service.impl;import com.alicp.jetcache.anno.CacheType;import com.alicp.jetcache.anno.Cached;import com.kakawanyifan.dao.BookDao;import com.kakawanyifan.pojo.Book;import com.kakawanyifan.service.BookService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.util.List;@Transactional @Service public class BookServiceImpl implements BookService { @Autowired private BookDao bookDao; 【部分代码略】 @Cached (name="book_" ,key="#id" ,expire = 3600 ,cacheType = CacheType.REMOTE) public Book getById (Integer id) { return bookDao.getById(id); } 【部分代码略】 }
远程方案的数据同步
由于远程方案中Redis保存的数据可以被多个客户端共享,这就存在了数据同步问题。
JetCache提供了三个注解解决此问题,分别:“更新、删除操作时同步缓存数据”、“读取缓存时定时刷新数据”。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package com.kakawanyifan.service.impl;import com.alicp.jetcache.anno.*;import com.kakawanyifan.dao.BookDao;import com.kakawanyifan.pojo.Book;import com.kakawanyifan.service.BookService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.util.List;@Transactional @Service public class BookServiceImpl implements BookService { @Autowired private BookDao bookDao; public boolean save (Book book) { bookDao.save(book); return true ; } @CacheUpdate (name="book_" ,key="#book.id" ,value="#book" ) public boolean update (Book book) { bookDao.update(book); return true ; } @CacheInvalidate (name="book_" ,key = "#id" ) public boolean delete (Integer id) { bookDao.delete(id); return true ; } @Cached (name="book_" ,key="#id" ,expire = 3600 ,cacheType = CacheType.REMOTE) @CacheRefresh (refresh = 5 ) public Book getById (Integer id) { return bookDao.getById(id); } public List<Book> getAll () { return bookDao.getAll(); } }
解释说明:
@CacheUpdate(name="book_",key="#book.id",value="#book")
,更新缓存。
@CacheInvalidate(name="book_",key = "#id")
,删除缓存。
@CacheRefresh(refresh = 5)
,定时刷新缓存。
数据报表
JetCache还提供有简单的数据报表功能,帮助开发者快速查看缓存命中信息,只需要添加一个配置即可:
1 2 jetcache: statIntervalMinutes: 1
设置后,每1分钟在控制台输出缓存数据命中信息
1 2 3 4 5 6 7 8 2023-01-22 16:16:00.007 INFO 14468 --- [DefaultExecutor] c.alicp.jetcache.support.StatInfoLogger : jetcache stat from 2023-01-22 16:15:12,528 to 2023-01-22 16:16:00,002 cache | qps| rate| get| hit| fail| expire|avgLoadTime|maxLoadTime ------------------------+----------+-------+--------------+--------------+--------------+--------------+-----------+----------- default_book_ | 0.75| 96.77%| 31| 30| 0| 0| 63.3| 356 default_jetCache_ | 0.00| 0.00%| 0| 0| 0| 0| 0.0| 0 default_jetCache__local | 0.00| 0.00%| 0| 0| 0| 0| 0.0| 0 default_jetCache__remote| 0.00| 0.00%| 0| 0| 0| 0| 0.0| 0 ------------------------+----------+-------+--------------+--------------+--------------+--------------+-----------+-----------
JetCache的配置
在application.yml
中,JetCache的配置有:
属性
默认值
说明
jetcache.statIntervalMinutes
0
统计间隔,0表示不统计
jetcache.hiddenPackages
false
自动生成name时,隐藏指定的包名前缀。false
,不隐藏。
jetcache.[local|remote].${area}.type
无
缓存类型,本地支持linkedhashmap、caffeine,远程支持redis、tair
jetcache.[local|remote].${area}.keyConvertor
无
key转换器,当前仅支持fastjson
jetcache.[local|remote].${area}.valueEncoder
Java
仅remote类型的缓存需要指定,可选java和kryo
jetcache.[local|remote].${area}.valueDecoder
Java
仅remote类型的缓存需要指定,可选java和kryo
jetcache.[local|remote].${area}.limit
100
仅local类型的缓存需要指定,缓存实例最大元素数
jetcache.[local|remote].${area}.expireAfterWriteInMillis
无穷大
默认过期时间,毫秒单位
定时任务
整合Quartz
关于Quartz,可以参考我们在《19.Quartz和APScheduler》 的讨论。
本文主要讨论SpringBoot整合Quartz。
导入SpringBoot整合Quartz的starter。
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-quartz</artifactId > </dependency >
定义任务Bean,继承QuartzJobBean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package com.kakawanyifan.job;import org.quartz.JobExecutionContext;import org.quartz.JobExecutionException;import org.springframework.scheduling.quartz.QuartzJobBean;import java.time.LocalDateTime;public class MyQuartzJob extends QuartzJobBean { @Override protected void executeInternal (JobExecutionContext context) throws JobExecutionException { System.out.println(LocalDateTime.now()); } }
创建Quartz配置类,定义工作明细(JobDetail)与触发器的(Trigger)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.kakawanyifan.config;import com.kakawanyifan.job.MyQuartzJob;import org.quartz.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class QuartzConfig { @Bean public JobDetail printJobDetail () { return JobBuilder.newJob(MyQuartzJob.class ).storeDurably ().build () ; } @Bean public Trigger printJobTrigger () { ScheduleBuilder schedBuilder = CronScheduleBuilder.cronSchedule("0/5 * * * * ?" ); return TriggerBuilder.newTrigger().forJob(printJobDetail()).withSchedule(schedBuilder).build(); } }
解释说明: SpringBoot整合Quartz就是将Quartz对应的核心对象交给Spring容器管理,包含两个对象,JobDetail和Trigger对象。
JobDetail工作明细中要设置对应的具体工作,使用newJob()操作传入对应的工作任务类型即可。 Trigger触发器需要绑定任务,使用forJob()操作传入绑定的工作明细对象。触发器中最核心的规则是执行时间,此处使用调度器定义执行时间,执行时间描述方式使用的是cron表达式。
启动Spring项目,运行结果:
1 2 3 4 5 2023-01-12T14:40:35.025 2023-01-12T14:40:40.004 2023-01-12T14:40:45.004 2023-01-12T14:40:50.015 2023-01-12T14:40:55.003
Task
除了Quartz,Spring中也自带了定时任务。
关于该部分,我们在《19.Quartz和APScheduler》 也有过讨论,本文主要讨论在SpringBoot中的定时任务。主要区别在于
在Spring中,@EnableScheduling
注解在配置类上。
在SpringBoot中,@EnableScheduling
在引导类上。
开启定时任务功能,在引导类上开启定时任务功能的开关,使用注解@EnableScheduling
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.kakawanyifan;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication @EnableScheduling public class Application { public static void main (String[] args) { SpringApplication.run(Application.class , args ) ; } }
定义Bean,在对应要定时执行的操作上方,使用注解@Scheduled
定义执行的时间,执行时间的描述方式还是cron表达式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.kakawanyifan.job;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Component public class MyTaskJob { @Scheduled (cron = "0/5 * * * * ?" ) public void print () { System.out.println(LocalDateTime.now()); } }
运行结果:
1 2 3 4 5 2023-01-12T14:46:15.011 2023-01-12T14:46:20.035 2023-01-12T14:46:25.015 2023-01-12T14:46:30.002 2023-01-12T14:46:35.005
如何想对定时任务进行相关配置,可以通过配置文件进行。
1 2 3 4 5 6 7 8 9 spring: task: scheduling: pool: size: 1 thread-name-prefix: ssm_ shutdown: await-termination: false await-termination-period: 10s
邮件
标准规范
邮件协议
常见的邮件协议有:
SMTP(Simple Mail Transfer Protocol),简单邮件传输协议,发邮件的标准。
POP3(Post Office Protocol - Version 3),邮局协议版本3,收邮件的标准。
IMAP(Internet Mail Access Protocol),Internet邮件访问协议,对POP3的升级。
在本文,我们主要讨论如何发邮件。
MIME
MIME(Multipurpose Internet Mail Extensions),多用途因特网邮件扩展标准。这个不是邮件传输协议,是对传输内容、附件等定义了格式。
JavaMail发邮件
邮件发送过程
整个发邮件的过程如下:
创建一个Session
对象。
Session
对象创建一个Transport
对象,用来发送邮件。
Transport
对象连接邮件服务器。
Transport
对象创建一个(Message
对象)。
Transport
对象发送邮件。
javax.mail
导入包javax.mail
1 2 3 4 5 <dependency > <groupId > com.sun.mail</groupId > <artifactId > javax.mail</artifactId > <version > 1.6.2</version > </dependency >
发送文本邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package com.kakawanyifan;import com.sun.mail.util.MailSSLSocketFactory;import javax.mail.Message;import javax.mail.MessagingException;import javax.mail.Session;import javax.mail.Transport;import javax.mail.internet.InternetAddress;import javax.mail.internet.MimeMessage;import java.security.GeneralSecurityException;import java.util.Properties;public class MailSend { public static void main (String[] args) throws GeneralSecurityException, MessagingException { Properties properties = new Properties(); properties.setProperty("mail.debug" , "true" ); properties.setProperty("mail.smtp.auth" , "true" ); properties.setProperty("mail.host" , "smtp.qq.com" ); properties.setProperty("mail.transport.protocol" , "smtp" ); MailSSLSocketFactory mailSSLSocketFactory = new MailSSLSocketFactory(); mailSSLSocketFactory.setTrustAllHosts(true ); properties.put("mail.smtp.ssl.enable" , "true" ); properties.put("mail.smtp.ssl.socketFactory" , mailSSLSocketFactory); Session session = Session.getInstance(properties); Transport transport = session.getTransport(); transport.connect("smtp.qq.com" , "【发件人邮箱】" , "【发件人密码】" ); MimeMessage message = new MimeMessage(session); message.setFrom(new InternetAddress("【发件人邮箱】" )); message.setRecipient(Message.RecipientType.TO, new InternetAddress("i@m.kakawanyifan.com" )); message.setRecipient(Message.RecipientType.CC, new InternetAddress("i@m.kakawanyifan.com" )); message.setRecipient(Message.RecipientType.BCC, new InternetAddress("i@m.kakawanyifan.com" )); message.setSubject("测试文本邮件-标题" ); message.setText("测试文本邮件-内容" ); transport.sendMessage(message, message.getAllRecipients()); transport.close(); } }
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 DEBUG: JavaMail version 1.6.2 DEBUG: successfully loaded resource: /META-INF/javamail.default.address.map DEBUG: getProvider() returning javax.mail.Provider[TRANSPORT,smtp,com.sun.mail.smtp.SMTPTransport,Oracle] DEBUG SMTP: useEhlo true, useAuth true DEBUG SMTP: trying to connect to host "smtp.qq.com", port 465, isSSL true 【部分运行结果略】 . 250 OK: queued as. DEBUG SMTP: message successfully delivered to mail server QUIT 221 Bye. Process finished with exit code 0
解释说明:
java.util.Properties
,这个类我们在《5.IO流》 有过讨论。
在本文,我们用于设置发邮件相关的属性有:
mail.host
:发送、接收邮件的默认邮箱服务器。
mail.store.protocol
:接收邮件的协议。
mail.transport.protocol
:发送邮件的协议。
mail.debug.auth
:是否开启DEBUG调试。
mail.smtp.auth
:SMPT是否需要身份验证。
javax.mail.Session
:邮件会话。
javax.mail.Transport
:邮件传输(发送)。
javax.mail.Store
:邮件存储(接收)。
javax.mail.Message
,这是一个抽象类,一般我们用的是其实现类javax.mail.internet.MimeMessage
。相关方法有:
setFrom
:设置邮件的发件人。
setRecipient
:设置邮件的收件人(Message.RecipientType.TO
)、抄送人(Message.RecipientType.CC
)和密送人(Message.RecipientType.BCC
)。
setSubject
:设置邮件的主题。
setContent
:设置邮件内容。
setText
:设置邮件的文本内容(纯文本)。
javax.mail.Address
:也是一个抽象类,一般我们用的是其实现类javax.mail.internet.InternetAddress
。
发送HTML邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.kakawanyifan;import com.sun.mail.util.MailSSLSocketFactory;import javax.activation.DataHandler;import javax.activation.FileDataSource;import javax.mail.Message;import javax.mail.MessagingException;import javax.mail.Session;import javax.mail.Transport;import javax.mail.internet.InternetAddress;import javax.mail.internet.MimeBodyPart;import javax.mail.internet.MimeMessage;import javax.mail.internet.MimeMultipart;import java.security.GeneralSecurityException;import java.util.Properties;public class MailSend { public static void main (String[] args) throws GeneralSecurityException, MessagingException { Properties properties = new Properties(); properties.setProperty("mail.debug" , "true" ); properties.setProperty("mail.smtp.auth" , "true" ); properties.setProperty("mail.host" , "smtp.qq.com" ); properties.setProperty("mail.transport.protocol" , "smtp" ); MailSSLSocketFactory mailSSLSocketFactory = new MailSSLSocketFactory(); mailSSLSocketFactory.setTrustAllHosts(true ); properties.put("mail.smtp.ssl.enable" , "true" ); properties.put("mail.smtp.ssl.socketFactory" , mailSSLSocketFactory); Session session = Session.getInstance(properties); Transport transport = session.getTransport(); transport.connect("smtp.qq.com" , "【发件人邮箱】" , "【发件人密码】" ); MimeMessage message = new MimeMessage(session); message.setFrom(new InternetAddress("【发件人邮箱】" )); message.setRecipient(Message.RecipientType.TO, new InternetAddress("i@m.kakawanyifan.com" )); message.setRecipient(Message.RecipientType.CC, new InternetAddress("i@m.kakawanyifan.com" )); message.setRecipient(Message.RecipientType.BCC, new InternetAddress("i@m.kakawanyifan.com" )); message.setSubject("测试HTML邮件-标题" ); String htmlContent = "<h1>Hello</h1>" + "<p>显示图片<img src='cid:abc.jpg'>1.jpg</p>" ; MimeBodyPart text = new MimeBodyPart(); text.setContent(htmlContent, "text/html;charset=UTF-8" ); MimeBodyPart image = new MimeBodyPart(); DataHandler dh = new DataHandler(new FileDataSource("/Users/kaka/Desktop/001.jpg" )); image.setDataHandler(dh); image.setContentID("abc.jpg" ); MimeMultipart mm = new MimeMultipart(); mm.addBodyPart(text); mm.addBodyPart(image); mm.setSubType("related" ); message.setContent(mm); message.saveChanges(); transport.sendMessage(message, message.getAllRecipients()); transport.close(); } }
解释说明:
MimeMessage:代表整封邮件。
MimeBodyPart:代表一个MIME信息。
MimeMultipart:代表一个由多个MIME信息组合成的组合MIME信息。
如果没有mm.setSubType("related");
,在某些邮件客户端,图片可能会同时以HTML和附件的形式存在。
发送带附件的邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.kakawanyifan;import com.sun.mail.util.MailSSLSocketFactory;import javax.mail.Message;import javax.mail.MessagingException;import javax.mail.Session;import javax.mail.Transport;import javax.mail.internet.InternetAddress;import javax.mail.internet.MimeBodyPart;import javax.mail.internet.MimeMessage;import javax.mail.internet.MimeMultipart;import java.io.IOException;import java.security.GeneralSecurityException;import java.util.Properties;public class MailSend { public static void main (String[] args) throws GeneralSecurityException, MessagingException, IOException { Properties properties = new Properties(); properties.setProperty("mail.debug" , "true" ); properties.setProperty("mail.smtp.auth" , "true" ); properties.setProperty("mail.host" , "smtp.qq.com" ); properties.setProperty("mail.transport.protocol" , "smtp" ); MailSSLSocketFactory mailSSLSocketFactory = new MailSSLSocketFactory(); mailSSLSocketFactory.setTrustAllHosts(true ); properties.put("mail.smtp.ssl.enable" , "true" ); properties.put("mail.smtp.ssl.socketFactory" , mailSSLSocketFactory); Session session = Session.getInstance(properties); Transport transport = session.getTransport(); transport.connect("smtp.qq.com" , "【发件人邮箱】" , "【发件人密码】" ); MimeMessage message = new MimeMessage(session); message.setFrom(new InternetAddress("【发件人邮箱】" )); message.setRecipient(Message.RecipientType.TO, new InternetAddress("i@m.kakawanyifan.com" )); message.setRecipient(Message.RecipientType.CC, new InternetAddress("i@m.kakawanyifan.com" )); message.setRecipient(Message.RecipientType.BCC, new InternetAddress("i@m.kakawanyifan.com" )); message.setSubject("测试带附件的邮件-标题" ); MimeBodyPart text = new MimeBodyPart(); text.setContent("邮件中有两个附件。" , "text/html;charset=UTF-8" ); MimeMultipart mm = new MimeMultipart(); mm.setSubType("related" ); mm.addBodyPart(text); MimeBodyPart attachPart1 = new MimeBodyPart(); attachPart1.attachFile("/Users/kaka/Desktop/001.jpg" ); mm.addBodyPart(attachPart1); MimeBodyPart attachPart2 = new MimeBodyPart(); attachPart2.attachFile("/Users/kaka/Desktop/002.png" ); mm.addBodyPart(attachPart2); message.setContent(mm); message.saveChanges(); transport.sendMessage(message, message.getAllRecipients()); transport.close(); } }
整合JavaMail
配置
导入SpringBoot
整合JavaMail的Starter
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-mail</artifactId > </dependency >
配置邮箱的登录信息
1 2 3 4 5 spring: mail: host: smtp.qq.com username: 【发件人邮箱】 password: 【发件人密码】
发送简单邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.kakawanyifan.email;import com.kakawanyifan.Application;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.mail.SimpleMailMessage;import org.springframework.mail.javamail.JavaMailSender;@SpringBootTest (classes = Application.class ) public class SendMailTest { @Autowired private JavaMailSender javaMailSender; private String from = "【发件人邮箱】" ; private String to = "i@m.kakawanyifan.com" ; private String subject = "测试邮件" ; private String context = "测试邮件正文内容" ; @Test public void sendMail () { SimpleMailMessage message = new SimpleMailMessage(); message.setFrom(from); message.setTo(to); message.setSubject(subject); message.setText(context); javaMailSender.send(message); } }
发送HTML邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 package com.kakawanyifan.email;import com.kakawanyifan.Application;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.mail.javamail.JavaMailSender;import org.springframework.mail.javamail.MimeMessageHelper;import javax.activation.DataHandler;import javax.activation.FileDataSource;import javax.mail.internet.MimeBodyPart;import javax.mail.internet.MimeMessage;import javax.mail.internet.MimeMultipart;import java.util.HashMap;import java.util.Map;import java.util.Set;@SpringBootTest (classes = Application.class ) public class SendMailTest { @Autowired private JavaMailSender javaMailSender; private String from = "【发件人邮箱】" ; private String to = "i@m.kakawanyifan.com" ; private String subject = "测试邮件" ; @Test public void sendMail () throws Exception { MimeMessage mimeMailMessage = javaMailSender.createMimeMessage(); MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMailMessage, true ); mimeMessageHelper.setFrom(from); mimeMessageHelper.setTo(to); mimeMessageHelper.setSubject(subject); MimeMultipart allMultipart = new MimeMultipart(); StringBuilder sb = new StringBuilder(); sb.append("<h2>SpirngBoot测试邮件HTML</h2>" ) .append("<p style='text-align:left'>这是一封HTML邮件...</p>" ) .append("<a href=http://www.baidu.com>点击进入百度</a><br/>" ) .append("<img src=\"cid:a00000001\"><br/><br/>" ) .append("<img src=\"cid:a00000002\">" ); String content = sb.toString(); Map<String,String> image = new HashMap<>(); image.put("a00000001" , "/Users/kaka/Desktop/001.jpg" ); image.put("a00000002" , "/Users/kaka/Desktop/002.png" ); MimeBodyPart contentPart = createContent(content, image); allMultipart.addBodyPart(contentPart); mimeMailMessage.setContent(allMultipart); mimeMailMessage.saveChanges(); javaMailSender.send(mimeMailMessage); } public MimeBodyPart createContent (String body, Map<String, String> map) throws Exception { MimeBodyPart contentPart = new MimeBodyPart(); MimeMultipart contentMultipart = new MimeMultipart("related" ); MimeBodyPart htmlBodyPart = new MimeBodyPart(); htmlBodyPart.setContent(body, "text/html;charset=UTF-8" ); contentMultipart.addBodyPart(htmlBodyPart); if (map != null && map.size() > 0 ) { Set<Map.Entry<String, String>> set = map.entrySet(); for (Map.Entry<String, String> entry : set) { MimeBodyPart gifBodyPart = new MimeBodyPart(); FileDataSource fds = new FileDataSource(entry.getValue()); gifBodyPart.setDataHandler(new DataHandler(fds)); gifBodyPart.setContentID(entry.getKey()); contentMultipart.addBodyPart(gifBodyPart); } } contentPart.setContent(contentMultipart); return contentPart; } }
发送带附件的邮件
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 package com.kakawanyifan.email;import com.kakawanyifan.Application;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.mail.javamail.JavaMailSender;import org.springframework.mail.javamail.MimeMessageHelper;import javax.activation.DataHandler;import javax.activation.FileDataSource;import javax.mail.internet.MimeBodyPart;import javax.mail.internet.MimeMessage;import javax.mail.internet.MimeMultipart;@SpringBootTest (classes = Application.class ) public class SendMailTest { @Autowired private JavaMailSender javaMailSender; private String from = "【发件人邮箱】" ; private String to = "i@m.kakawanyifan.com" ; private String subject = "测试邮件" ; @Test public void sendMail () throws Exception { MimeMessage mimeMailMessage = javaMailSender.createMimeMessage(); MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMailMessage, true ); mimeMessageHelper.setFrom(from); mimeMessageHelper.setTo(to); mimeMessageHelper.setSubject(subject); MimeMultipart allMultipart = new MimeMultipart(); MimeBodyPart contentPart = new MimeBodyPart(); contentPart.setText("附件" ); allMultipart.addBodyPart(contentPart); allMultipart.addBodyPart(createAttachment("/Users/kaka/Desktop/001.jpg" )); allMultipart.addBodyPart(createAttachment("/Users/kaka/Desktop/002.png" )); mimeMailMessage.setContent(allMultipart); mimeMailMessage.saveChanges(); javaMailSender.send(mimeMailMessage); } public MimeBodyPart createAttachment (String filename) throws Exception { MimeBodyPart attachPart = new MimeBodyPart(); FileDataSource fsd = new FileDataSource(filename); attachPart.setDataHandler(new DataHandler(fsd)); attachPart.setFileName(fsd.getName()); return attachPart; } }
工具类
MailInfo
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package com.kakawanyifan.pojo;import lombok.Data;import java.util.Map;@Data public class MailInfo { private String[] receiver; private String subject; private String content; private String[] cc; private String[] attachFileNames; private Map<String,String> imageMap; }
SendMailService
:
1 2 3 4 5 6 7 8 9 10 11 12 package com.kakawanyifan.service;import com.kakawanyifan.pojo.MailInfo;public interface SendMailService { void sendSimpleTextEmail (MailInfo mailInfo) ; void sendHtmlEmail (MailInfo mailInfo,boolean html) ; void sendEnclosureEmail (MailInfo mailInfo) ; }
SendMailServiceImpl
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 package com.kakawanyifan.service.impl;import com.kakawanyifan.pojo.MailInfo;import com.kakawanyifan.service.SendMailService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.mail.SimpleMailMessage;import org.springframework.mail.javamail.JavaMailSender;import org.springframework.mail.javamail.MimeMessageHelper;import org.springframework.stereotype.Component;import javax.activation.DataHandler;import javax.activation.FileDataSource;import javax.annotation.Resource;import javax.mail.internet.MimeBodyPart;import javax.mail.internet.MimeMessage;import javax.mail.internet.MimeMultipart;import java.util.*;@Slf 4j@Component public class SendMailServiceImpl implements SendMailService { @Resource private JavaMailSender javaMailSender; @Value (value = "${spring.mail.username}" ) private String emailFrom; @Override public void sendSimpleTextEmail (MailInfo mailInfo) { try { SimpleMailMessage mailMessage = new SimpleMailMessage(); mailMessage.setFrom(emailFrom); mailMessage.setTo(mailInfo.getReceiver()); mailMessage.setSubject(mailInfo.getSubject()); mailMessage.setCc(mailInfo.getCc()); mailMessage.setText(mailInfo.getContent()); javaMailSender.send(mailMessage); } catch (Exception e) { log.error("邮件发送失败:{}" , e.getMessage()); } } @Override public void sendHtmlEmail (MailInfo mailInfo,boolean html) { try { MimeMessage mimeMailMessage = javaMailSender.createMimeMessage(); MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMailMessage); mimeMessageHelper.setFrom(emailFrom); mimeMessageHelper.setTo(mailInfo.getReceiver()); mimeMessageHelper.setSubject(mailInfo.getSubject()); mimeMessageHelper.setCc(mailInfo.getCc()); mimeMessageHelper.setText(mailInfo.getContent(), html); javaMailSender.send(mimeMailMessage); } catch (Exception e) { log.error("邮件发送失败:{}" , e.getMessage()); } } @Override public void sendEnclosureEmail (MailInfo mailInfo) { try { MimeMessage mimeMailMessage = javaMailSender.createMimeMessage(); MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMailMessage, true ); mimeMessageHelper.setFrom(emailFrom); mimeMessageHelper.setTo(mailInfo.getReceiver()); mimeMessageHelper.setSubject(mailInfo.getSubject()); mimeMessageHelper.setCc(mailInfo.getCc()); mimeMessageHelper.setSentDate(new Date()); MimeMultipart allMultipart = new MimeMultipart(); MimeBodyPart contentPart = createContent(mailInfo.getContent(), mailInfo.getImageMap()); allMultipart.addBodyPart(contentPart); for (int i = 0 ; i < mailInfo.getAttachFileNames().length; i++) { allMultipart.addBodyPart(createAttachment(mailInfo.getAttachFileNames()[i])); } mimeMailMessage.setContent(allMultipart); mimeMailMessage.saveChanges(); javaMailSender.send(mimeMailMessage); } catch (Exception e) { log.error("邮件发送失败:{}" , e.getMessage()); } } public MimeBodyPart createAttachment (String filename) throws Exception { MimeBodyPart attachPart = new MimeBodyPart(); FileDataSource fsd = new FileDataSource(filename); attachPart.setDataHandler(new DataHandler(fsd)); attachPart.setFileName(fsd.getName()); return attachPart; } public MimeBodyPart createContent (String body) throws Exception { MimeBodyPart contentPart = new MimeBodyPart(); MimeMultipart contentMultipart = new MimeMultipart("related" ); MimeBodyPart htmlBodyPart = new MimeBodyPart(); htmlBodyPart.setContent(body, "text/html;charset=UTF-8" ); contentMultipart.addBodyPart(htmlBodyPart); contentPart.setContent(contentMultipart); return contentPart; } public MimeBodyPart createContent (String body, Map<String, String> map) throws Exception { MimeBodyPart contentPart = new MimeBodyPart(); MimeMultipart contentMultipart = new MimeMultipart("related" ); MimeBodyPart htmlBodyPart = new MimeBodyPart(); htmlBodyPart.setContent(body, "text/html;charset=UTF-8" ); contentMultipart.addBodyPart(htmlBodyPart); if (map != null && map.size() > 0 ) { Set<Map.Entry<String, String>> set = map.entrySet(); for (Map.Entry<String, String> entry : set) { MimeBodyPart gifBodyPart = new MimeBodyPart(); FileDataSource fds = new FileDataSource(entry.getValue()); gifBodyPart.setDataHandler(new DataHandler(fds)); gifBodyPart.setContentID(entry.getKey()); contentMultipart.addBodyPart(gifBodyPart); } } contentPart.setContent(contentMultipart); return contentPart; } }
测试类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 package com.kakawanyifan.email;import com.kakawanyifan.Application;import com.kakawanyifan.pojo.MailInfo;import com.kakawanyifan.service.impl.SendMailServiceImpl;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import java.util.Date;import java.util.HashMap;import java.util.Map;@SpringBootTest (classes = Application.class ) public class SendMailServiceTest { @Autowired SendMailServiceImpl emailSendMsgHandle; @Test void sendSimpleTextMail () { MailInfo mailInfo = new MailInfo(); mailInfo.setReceiver(new String[]{"i@m.kakawanyifan.com" }); mailInfo.setSubject("测试主题" ); mailInfo.setContent("邮件内容" ); emailSendMsgHandle.sendSimpleTextEmail(mailInfo); } @Test public void sendHTMLMail () { MailInfo mailBean = new MailInfo(); mailBean.setReceiver(new String[]{"i@m.kakawanyifan.com" }); mailBean.setSubject("SpringBootMailHTML之这是一封HTML格式的邮件" ); mailBean.setCc(new String[]{"i@m.kakawanyifan.com" }); StringBuilder sb = new StringBuilder(); sb.append("<h2>SpirngBoot测试邮件HTML</h2>" ) .append("<p style='text-align:left'>这是一封HTML邮件...</p>" ) .append("<p> 时间为:" + new Date() +"</p>" ); mailBean.setContent(sb.toString()); emailSendMsgHandle.sendHtmlEmail(mailBean,true ); } @Test void sendAttachMail () { MailInfo mailBean = new MailInfo(); mailBean.setReceiver(new String[]{"i@m.kakawanyifan.com" }); mailBean.setSubject("SpringBootMailHTML之这是一封HTML格式的邮件" ); mailBean.setCc(new String[]{"i@m.kakawanyifan.com" }); StringBuilder sb = new StringBuilder(); sb.append("<h2>SpirngBoot测试邮件HTML</h2>" ) .append("<p style='text-align:left'>这是一封HTML邮件...</p>" ) .append("<p> 时间为:" + new Date() +"</p>" ); mailBean.setContent(sb.toString()); mailBean.setAttachFileNames(new String[]{"/Users/kaka/Desktop/001.jpg" ,"/Users/kaka/Desktop/002.png" }); emailSendMsgHandle.sendEnclosureEmail(mailBean); } @Test void sendHTMLMailWithImgAndAttach () { MailInfo mailBean = new MailInfo(); mailBean.setReceiver(new String[]{"i@m.kakawanyifan.com" }); mailBean.setSubject("SpringBootMailHTML之这是一封HTML格式的邮件" ); mailBean.setCc(new String[]{"i@m.kakawanyifan.com" }); StringBuilder sb = new StringBuilder(); sb.append("<h2>SpirngBoot测试邮件HTML</h2>" ) .append("<p style='text-align:left'>这是一封HTML邮件...</p>" ) .append("<a href=http://www.baidu.com>点击进入百度</a><br/>" ) .append("<img src=\"cid:a00000001\"><br/><br/>" ) .append("<img src=\"cid:a00000002\">" ) .append("<p> 时间为:" + new Date() +"</p>" ); mailBean.setContent(sb.toString()); mailBean.setAttachFileNames(new String[]{"/Users/kaka/Desktop/001.jpg" ,"/Users/kaka/Desktop/002.png" }); Map<String,String> image = new HashMap<>(); image.put("a00000001" , "/Users/kaka/Desktop/001.jpg" ); image.put("a00000002" , "/Users/kaka/Desktop/002.png" ); mailBean.setImageMap(image); emailSendMsgHandle.sendEnclosureEmail(mailBean); } }
消息队列
概念
在计算机的语境下,消息除了具有数据的特征之外,还有消息的来源与接收着两个概念。通常发送消息的一方称为消息的生产者,接收消息的一方称为消息的消费者。
对于消息的生产者与消费者的工作模式,还可以将消息划分成两种模式,同步消费与异步消息。所谓同步消息就是生产者发送完消息,等待消费者处理,消费者处理完将结果告知生产者,然后生产者继续向下执行业务。这种模式过于卡生产者的业务执行连续性,在现在的企业级开发中,上述这种业务场景通常不会采用消息的形式进行处理。
所谓异步消息就是生产者发送完消息,无需等待消费者处理完毕,生产者继续向下执行其他动作。比如生产者发送了一个日志信息给日志系统,发送过去以后生产者就向下做其他事情了,无需关注日志系统的执行结果。日志系统根据接收到的日志信息继续进行业务执行,是单纯的记录日志,还是记录日志并报警,这些和生产者无关,这样生产者的业务执行效率就会大幅度提升。并且可以通过添加多个消费者来处理同一个生产者发送的消息来提高系统的高并发性,改善系统工作效率,提高用户体验。一旦某一个消费者由于各种问题宕机了,也不会对业务产生影响,提高了系统的高可用性。
标准规范
目前企业级开发中广泛使用的消息处理技术共三大类,具体如下:
JMS
概述
JMS,Java Message Service。
对JMS规范实现的消息中间件技术有:ActiveMQ、Redis和HornetMQ等。但是也有一些不太规范的实现,参考JMS的标准设计,但是又不完全满足其规范,例如:RabbitMQ、RocketMQ。
有些资料会说,基于JMS规范的消息队列,无法在非Java的语言中用。其实不是这样的,例如ActiveMQ中就有各种跨语言的客户端。具体可以参考:
JMS消息模型
JMS规范了消息有两种模型:
peer-2-peer,点对点模型。生产者会将消息发送到一个保存消息的容器(队列)中;一个队列的消息只能被一个消费者消费,或未被及时消费导致超时。这种模型下,生产者和消费者是一对一绑定的。
publish-subscribe,发布订阅模型。生产者将消息发送到一个保存消息的容器(队列)中;消息可以被多个消费者消费,生产者和消费者完全独立。
JMS消息种类
根据消息中包含的数据种类划分,可以将消息划分成6种消息。
TextMessage
MapMessage
BytesMessage
StreamMessage
ObjectMessage
Message
(只有消息头和属性)
其中,最常用的是BytesMessage
。
AMQP
概述
AMQP,advanced message queuing protocol,高级消息队列协议,规范了网络交换的数据格式,兼容JMS操作。
其优点是跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现。
目前实现了AMQP协议的消息中间件技术有:RabbitMQ、RocketMQ和StormMQ。
AMQP消息种类
AMQP消息种类只有一种: byte[]
。
这么设计的原因,是为了跨平台。
AMQP消息模型
AMQP在JMS的消息模型基础上又进行了进一步的扩展,除了点对点和发布订阅的模型,还有了几种全新的消息模型:
direct exchange
fanout exchange
topic exchange
headers exchange
system exchange
MQTT
MQTT,Message Queueing Telemetry Transport,消息队列遥测传输,专为小设备设计,是物联网生态系统中主要成分之一。在本文,我们不作讨论。
KafKa
Kafka,一种高吞吐量的分布式发布订阅消息系统,提供实时消息功能。Kafka技术并不是作为消息中间件为主要功能的产品,但是其拥有发布订阅的工作模式,也可以充当消息中间件来使用,而且目前企业级开发中其身影也不少见。
案例(购物订单发送手机短信)
需求
接下来,我们以购物订单发送手机短信为例,讨论各种各样的消息中间件技术。
需求如下:
执行下单业务时,调用消息服务,将要发送短信的订单ID传递给消息中间件
消息处理服务接收到要发送的订单ID后输出订单ID(模拟发短信)
订单业务
业务层接口:
1 2 3 4 5 package com.kakawanyifan.service;public interface OrderService { void order (String id) ; }
业务层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package com.kakawanyifan.service.impl;import com.kakawanyifan.service.MessageService;import com.kakawanyifan.service.OrderService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Service @Slf 4jpublic class OrderServiceImpl implements OrderService { @Autowired MessageService messageService; @Override public void order (String id) { log.info("订单处理开始" ); messageService.sendMessage(id); log.info("订单处理结束" ); } }
表现层:
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package com.kakawanyifan.controller;import com.kakawanyifan.service.OrderService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController @RequestMapping ("orders/" )public class OrderController { @Autowired OrderService orderService; @PostMapping ("{id}" ) public void order (@PathVariable String id) { orderService.order(id); } }
短信处理业务
业务层接口:
示例代码:
1 2 3 4 5 6 7 8 9 10 11 package com.kakawanyifan.service;public interface MessageService { void sendMessage (String id) ; String doMessage () ; }
解释说明:短信处理业务层接口提供两个操作,发送要处理的订单ID到消息中间件,另一个操作目前暂且设计成处理消息,实际消息的处理过程不应该是手动执行,应该是自动执行。
业务层实现:
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.kakawanyifan.service.impl;import com.kakawanyifan.service.MessageService;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import java.util.ArrayList;import java.util.List;@Service @Slf 4jpublic class MessageServiceImpl implements MessageService { List<String> messageList = new ArrayList<>(); @Override public void sendMessage (String id) { log.info("待发送短信的订单,已经加入到消息队列。id:" + id); messageList.add(id); } @Override public String doMessage () { String id = messageList.remove(0 ); log.info("已完成短信发送。id:" + id); return id; } }
表现层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.kakawanyifan.controller;import com.kakawanyifan.service.MessageService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController @RequestMapping ("message" )public class MessageController { @Autowired MessageService messageService; @GetMapping public String doMessage () { String id = messageService.doMessage(); return id; } }
解释说明:短信处理表现层接口暂且开发出一个处理消息的入口,但是此业务是对应业务层中设计的模拟接口,实际业务不需要设计此接口。
模拟调用
订单:curl --location --request POST 'http://localhost:8080/orders/2'
短信:curl --location --request GET 'http://localhost:8080/message'
整合ActiveMQ
ActiveMQ是MQ产品中的元老级产品,早期标准MQ产品之一,在AMQP协议没有出现之前,占据了消息中间件市场的绝大部分份额,后期因为AMQP系列产品的出现,迅速走弱,目前仅在一些线上运行的产品中出现,新产品开发较少采用。
安装
步骤:
通过官网下载acticemq的安装包
官网:https://activemq.apache.org/
解压1 tar -zxvf apache-activemq-5.16.5-bin.tar.gz
启动
在解压后的bin
目录下找到activemq
,启动命令:
另,停止命令:./activemq stop
启动之后,我们可以访问其管理页面,地址是http://10.211.55.14:8161/admin/
,用户名和密码都是admin
。
问题处理
如果启动后没有任何报错,如下:
1 2 3 4 INFO: Loading '/usr/local/activemq/apache-activemq-5.17.3//bin/env' INFO: Using java '/usr/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/usr/local/activemq/apache-activemq-5.17.3//data/activemq.pid' (pid '14377')
而且在data/activemq.log
没有任何内容。 可能是因为JDK版本不对。 通过官网的下载页面,可以看到要求的JDK版本。
如果ActiveMQ正常启动,防火墙端口也开放了,但是浏览器发现无法访问其管理页面。
解决方法: 修改conf
目录的jetty.xml
。 找到bean id="jettyPort"
配置下的name="host"
的参数配置,将参数值由127.0.0.1
改为0.0.0.0
,然后重启ActiveMQ。
整合
导入SpringBoot整合ActiveMQ的Starter:
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-activemq</artifactId > </dependency >
配置ActiveMQ的服务器地址:
1 2 3 spring: activemq: broker-url: tcp://10.211.55.14:61616
使用JmsMessagingTemplate操作ActiveMQ:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.kakawanyifan.service.impl;import com.kakawanyifan.service.MessageService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsMessagingTemplate;import org.springframework.stereotype.Service;@Service @Slf 4jpublic class MessageServiceActivemqImpl implements MessageService { @Autowired JmsMessagingTemplate jmsMessagingTemplate; @Override public void sendMessage (String id) { log.info("待发送短信的订单已纳入处理队列,id : " + id); jmsMessagingTemplate.convertAndSend("order.queue.id" ,id); } @Override public String doMessage () { String id = jmsMessagingTemplate.receiveAndConvert("order.queue.id" ,String.class ) ; log.info("已完成短信发送业务,id : " + id); return id; } }
解释说明:
发送消息需要先将消息的类型转换成字符串,然后再发送,所以是convertAndSend
。需要定义消息发送的位置和具体的消息内容,此处使用ID作为消息内容。 接收消息需要先将消息接收到,然后再转换成指定的数据类型,所以是receiveAndConvert
。接收消息除了提供读取的位置,还要给出转换后的数据的具体类型。
使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package com.kakawanyifan.listener;import lombok.extern.slf4j.Slf4j;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;@Component @Slf 4jpublic class MessageListener { @JmsListener (destination = "order.queue.id" ) public void receive (String id) { log.info("已完成短信发送业务,id : " + id); } }
解释说明:使用注解@JmsListener
定义当前方法监听ActiveMQ中指定名称的消息队列。
如果当前消息队列处理完还需要继续向下传递当前消息到另一个队列中使用注解@SendTo
即可,这样即可构造连续执行的顺序消息队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.kakawanyifan.listener;import lombok.extern.slf4j.Slf4j;import org.springframework.jms.annotation.JmsListener;import org.springframework.messaging.handler.annotation.SendTo;import org.springframework.stereotype.Component;@Component @Slf 4jpublic class MessageListener { @JmsListener (destination = "order.queue.id" ) @SendTo ("order.next.id" ) public String receive (String id) { log.info("已完成短信发送业务,id : " + id); return "next : " + id; } }
切换消息模型由点对点模型到发布订阅模型,修改jms配置pub-sub-domain
即可。
1 2 3 4 5 spring: activemq: broker-url: tcp://10.211.55.14:61616 jms: pub-sub-domain: true
解释说明:pub-sub-domain
默认值为false,即点对点模型,修改为true后就是发布订阅模型。
整合RocketMQ
安装
官网:https://rocketmq.apache.org
下载RocketMQ的二进制包。
之后解压即可,解压后的目录如下:
1 2 3 4 5 6 7 drwxr-xr-x. 2 root root 4096 Jun 21 2022 benchmark drwxr-xr-x. 3 root root 4096 Jan 29 14:47 bin drwxr-xr-x. 6 root root 4096 Jun 20 2022 conf drwxr-xr-x. 2 root root 4096 Jun 21 2022 lib -rw-r--r--. 1 root root 17327 Jun 20 2022 LICENSE -rw-r--r--. 1 root root 1338 Jun 20 2022 NOTICE -rw-r--r--. 1 root root 11219 Jun 20 2022 README.md
启动
工作模式
在RocketMQ中,有命名服务器(NameServer)和业务服务器(broker)的概念。 生产者与消费者并不是直接与业务服务器(broker)联系,而是通过命名服务器(NameServer)进行通信。 业务服务器(broker)启动后会通知命名服务器(NameServer),自己已经上线,这样命名服务器(NameServer)中就保存有所有的业务服务器(broker)信息。当生产者与消费者需要连接业务服务器(broker)时,通过命名服务器(NameServer)找到对应的处理业务的业务服务器(broker)。命名服务器(NameServer)在整套结构中起到一个信息中心的作用。
注意:是通过命名服务器找到业务服务器,之后还是和业务服务器进行通信。
启动服务器
基于我们上文的讨论,我们要先启动命名服务器(NameServer),然后才能启动业务服务器(broker)。
启动命名服务器(NameServer):
启动之后,在用户的home目录,会生成一个logs/rocketmqlogs
目录,相关的日志位于该目录下,日志名是namesrv.log
。 默认对外服务端口9876。
启动业务服务器(broker):
1 nohup sh mqbroker -n localhost:9876 &
测试服务器启动状态
测试生产者,示例代码:
1 2 export NAMESRV_ADDR=localhost:9876 sh tools.sh org.apache.rocketmq.example.quickstart.Producer
运行结果:
1 2 3 4 5 6 【部分运行结果略】 SendResult [sendStatus=SEND_OK, msgId=FDB22C26F4E40000021C42FFFE9DD35842E3266474C293AFBB5003E7, offsetMsgId=0AD3370E00002A9F00000000000695CC, messageQueue=MessageQueue [topic=TopicTest, brokerName=centos-linux.shared, queueId=1], queueOffset=499] 【部分运行结果略】
测试消费者,示例代码:
1 sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
运行结果:
1 2 3 4 5 6 【部分运行结果略】 ConsumeMessageThread_please_rename_unique_group_name_4_16 Receive New Messages: [MessageExt [brokerName=centos-linux.shared, queueId=0, storeSize=216, queueOffset=347, sysFlag=0, bornTimestamp=1674980166678, bornHost=/10.211.55.14:53548, storeTimestamp=1674980166679, storeHost=/10.211.55.14:10911, msgId=0AD3370E00002A9F00000000000493F4, commitLogOffset=300020, bodyCRC=1380090473, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1674980218912, UNIQ_KEY=FDB22C26F4E40000021C42FFFE9DD35842E3266474C293AFB8160186, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51, 57, 48], transactionId='null'}]] 【部分运行结果略】
问题处理
按照本文的启动方式,启动期间的部分日志会打印在bin
目录的nohup.out
文件中。
如果在启动名称服务器期间,有报错如下:
1 java.net.BindException: Address already in use
是因为端口被占用了,默认端口是9876。
如果在启动名称服务器期间,有报错如下:
1 2 3 4 5 6 OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory. # An error report file with more information is saved as: # /usr/local/rocketmq/rocketmq-all-4.9.4-bin-release/bin/hs_err_pid3714.log
是因为内存不够,修改bin
目录下的runserver.sh
。 搜索关键词JAVA_OPT
,可以找到设置JAVA_OPT
的代码,在结尾添加如下的一行:
1 JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m"
如果在启动业务服务器期间报内存不够,修改bin
目录下的runserver.sh
。 在设置JAVA_OPT
的代码的结尾,添加如下的一行:
1 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
整合
导入SpringBoot整合RocketMQ的Starter,该Starter不由SpringBoot维护
1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > 2.2.2</version > </dependency >
配置RocketMQ
1 2 3 4 rocketmq: name-server: 10.211 .55 .14 :9876 producer: group: group_rocketmq
解释说明:
name-server
:名称服务器地址。producer.group
:生产者所属组。
使用RocketMQTemplate操作RocketMQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.kakawanyifan.service.impl;import com.kakawanyifan.service.MessageService;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Service @Slf 4jpublic class MessageServiceRocketmqImpl implements MessageService { @Autowired private RocketMQTemplate rocketMQTemplate; @Override public void sendMessage (String id) { log.info("待发送短信的订单已纳入处理队列(rocketmq),id:" +id); SendCallback callback = new SendCallback() { @Override public void onSuccess (SendResult sendResult) { log.info("消息发送成功" ); } @Override public void onException (Throwable e) { log.info("消息发送失败" ); } }; rocketMQTemplate.asyncSend("order_id" ,id,callback); } @Override public String doMessage () { return null ; } }
使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package com.kakawanyifan.listener;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component @Slf 4j@RocketMQMessageListener (topic = "order_id" ,consumerGroup = "group_rocketmq" )public class MessageListener implements RocketMQListener <String > { @Override public void onMessage (String id) { log.info("已完成短信发送业务(rocketmq),id:" +id); } }
解释说明:
RocketMQ的监听器必须按照标准格式开发,实现RocketMQListener接口,泛型为消息类型。 使用注解@RocketMQMessageListener定义当前类监听RabbitMQ中指定组、指定名称的消息队列。
在发送消息的时候,可能会失败,查看异常的话,内容可能如下:
1 org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to 10.211.55.14:10911 failed
原因是当生产者与消费者需要连接业务服务器(broker)时,通过命名服务器(NameServer)找到对应的处理业务的业务服务器(broker)。
,即是通过命名服务器找到业务服务器,之后还是和业务服务器进行通信。
整合Kafka
安装
官网:https://kafka.apache.org
下载Kafka的二进制包。
之后解压即可,解压后的目录如下:
1 2 3 4 5 6 7 drwxr-xr-x. 3 root root 4096 Dec 22 05:21 bin drwxr-xr-x. 3 root root 4096 Dec 22 05:21 config drwxr-xr-x. 2 root root 4096 Jan 29 17:31 libs -rw-r--r--. 1 root root 14844 Dec 22 05:14 LICENSE drwxr-xr-x. 2 root root 4096 Dec 22 05:21 licenses -rw-r--r--. 1 root root 28184 Dec 22 05:14 NOTICE drwxr-xr-x. 2 root root 4096 Dec 22 05:21 site-docs
配置文件server.properties
在bin/config
目录下,我们会找到配置文件server.properties
。
其中部分关键内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # A comma separated list of directories under which to store log files log.dirs=/tmp/kafka-logs # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the root directory for all kafka znodes. zookeeper.connect=localhost:2181 # The address the socket server listens on. If not configured, the host name will be equal to the value of # java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 # Listener name, hostname and port the broker will advertise to clients. # If not set, it uses the value for "listeners". #advertised.listeners=PLAINTEXT://your.host.name:9092
注意:
listeners
,需要改为listeners=PLAINTEXT://0.0.0.0:9092
。listeners
,监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的Kafka服务。advertised.listeners
,需要改为advertised.listeners=PLAINTEXT://【服务器IP】:9092
advertise的含义表示宣称的、公布的,就是组监听器是Broker用于对外发布的。
启动
1 ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
1 ./kafka-server-start.sh -daemon ../config/server.properties
解释说明:-daemon
,表示在后台运行。
使用
创建topic: 示例代码:
1 ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1
运行结果:
查看Kafka中topic的情况: 示例代码:
1 ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
运行结果:
查看某一个topic的详细信息:
示例代码:
1 ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic1
运行结果:
1 2 Topic: topic1 TopicId: XjiNjy6pShG7TZFSWSHi_Q PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
生产消息:
1 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1
此时,会进入生产消息的编辑模式,回车键发送消息。
消费消息:
1 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --from-beginning
删除topic:
1 ./bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic topic1
此时我们再查看topic1
,示例代码:
1 ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic1
运行结果:
1 2 3 4 5 6 7 Error while executing topic command : Topic 'topic1' does not exist as expected [2023-01-29 20:20:26,692] ERROR java.lang.IllegalArgumentException: Topic 'topic1' does not exist as expected at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:401) at kafka.admin.TopicCommand$TopicService.describeTopic(TopicCommand.scala:313) at kafka.admin.TopicCommand$.main(TopicCommand.scala:61) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$)
有些资料说,再次查看的话,并不会被直接删除,而是有"marked for deletion,只是将topic1标记了删除",在这里没有复现。
关闭服务:
关闭Kafka:
1 ./bin/kafka-server-stop.sh
关闭Zookeeper:
1 ./bin/zookeeper-server-stop.sh
整合
导入SpringBoot整合Kafka的Starter,此坐标由SpringBoot维护版本。
1 2 3 4 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > </dependency >
配置Kafka:
1 2 3 4 5 spring: kafka: bootstrap-servers: 10.211 .55 .14 :9092 consumer: group-id: order
解释说明:
bootstrap-servers
:Kafka的服务地址。consumer.group-id
:设置消费者所属的组ID。
使用KafkaTemplate操作Kafka:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.kakawanyifan.service.impl;import com.kakawanyifan.service.MessageService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Service;@Service @Slf 4jpublic class MessageServiceKafkaImpl implements MessageService { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @Override public void sendMessage (String id) { kafkaTemplate.send("order_id" ,id); log.info("待发送短信的订单已纳入处理队列(kafka),id:" + id); } @Override public String doMessage () { return null ; } }
解释说明:使用send方法,第一个参数是topic的名称。
使用消息监听器,指定监听的topics:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.kakawanyifan.listener;import lombok.extern.slf4j.Slf4j;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Component @Slf 4jpublic class MessageListener { @KafkaListener (topics = "order_id" ) public void onMessage (ConsumerRecord<String,String> record) { log.info("已完成短信发送业务(kafka),id:" + record.value()); } }
更详细的配置,可以参考官方文档。
地址:https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#application-properties.integration.spring.kafka.consumer.auto-commit-interval