avatar


22.SpringBoot [2/3]

缓存

什么是缓存

在需要频繁的读取数据时,受限于数据库的访问效率,系统整体性能偏低。
为了改善上述现象,我们在应用程序与数据库之间建立一种临时的数据存储机制,该区域中的数据保存在内存中,读写速度较快,可以有效解决数据库访问效率低下的问题。
这一块临时存储数据的区域就是缓存。

在使用缓存后,应用程序与缓存打交道,缓存与数据库打交道。

内置的缓存

SpringBoot内置了缓存。

用法

第一步:导入缓存技术对应的Starter。

spring-boot-starter-cache

  • 注意名字"Spring cache abstraction", “abstraction” ,抽象的。在下文我们就能理解为什么是抽象的。

其POM坐标如下:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>

第二步:启用缓存,在引导类上方标注注解@EnableCaching配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.kakawanyifan;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;

@SpringBootApplication
@MapperScan("com.kakawanyifan.dao")
@EnableCaching
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}

第三步:设置操作的数据使用缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.kakawanyifan.service.impl;

import com.kakawanyifan.dao.BookDao;
import com.kakawanyifan.pojo.Book;
import com.kakawanyifan.service.BookService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
@Transactional
@Service
public class BookServiceImpl implements BookService {
@Autowired
private BookDao bookDao;

【部分代码略】

@Cacheable(value="cacheSpace",key="#id")
public Book getById(Integer id) {
return bookDao.getById(id);
}

【部分代码略】

}

解释说明:
我们​在业务方法getById上使用@Cacheable注解声明当前方法的返回值放入缓存中。
我们还需要指定缓存的存储位置,以及缓存中保存当前方法返回值对应的名称。

  • value:描述缓存的存储位置。
  • key:描述缓存中保存数据的名称,使用#id读取形参中的id值作为缓存名称。

​使用@Cacheable注解后,执行当前操作,如果发现对应名称在缓存中没有数据,就正常读取数据,然后放入缓存;如果对应名称在缓存中有数据,就终止当前业务方法执行,直接返回缓存中的数据。

我们可以配置MyBatis打印内容到控制台,然后多请求几次看看,会发现第二次就没有打印SQL日志等,即使用了缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
【部分运行结果略】

2023-01-05 21:27:53.014 INFO 2100 --- [nio-8080-exec-1] c.k.controller.BookController : getById
2023-01-05 21:27:53.050 INFO 2100 --- [nio-8080-exec-1] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} inited
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6420dfc8]
JDBC Connection [com.mysql.cj.jdbc.ConnectionImpl@496b9b67] will be managed by Spring
==> Preparing: select * from book where id = ?
==> Parameters: 10(Integer)
<== Columns: id, type, name, description
<== Row: 10, 市场营销, 直播就这么做:主播高效沟通实战指南, 李子柒、李佳奇、薇娅成长为网红的秘密都在书中
<== Total: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6420dfc8]
Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6420dfc8]
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6420dfc8]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6420dfc8]
2023-01-05 21:27:59.386 INFO 2100 --- [nio-8080-exec-3] c.k.controller.BookController : getById
2023-01-05 21:28:16.200 INFO 2100 --- [nio-8080-exec-4] c.k.controller.BookController : getById

【部分运行结果略】

案例(手机验证码)

我们再举一个手机验证码相关的例子,需求如下:

  • 输入手机号获取验证码
  • 输入手机号和验证码验证结果

因此,我们设计两个表现层接口:一个用来模拟发送短信的过程,根据用户提供的手机号生成一个验证码,然后放入缓存;另一个用来模拟验证码校验的过程,使用传入的手机号和验证码进行匹配,并返回最终匹配结果。

验证码实体类,封装手机号与验证码两个属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.kakawanyifan.pojo;

import lombok.Data;

import java.io.Serializable;

@Data
public class SMSCode implements Serializable {
// 手机号
private String tel;
// 验证码
private String code;

public SMSCode(String tel, String code) {
this.tel = tel;
this.code = code;
}
}

验证码功能的业务层接口与实现类。

1
2
3
4
5
6
7
8
package com.kakawanyifan.service;

import com.kakawanyifan.pojo.SMSCode;

public interface SMSCodeService {
public SMSCode sendCodeToSMS(String tel);
public boolean checkCode(SMSCode smsCode);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.kakawanyifan.service.impl;

import com.kakawanyifan.pojo.SMSCode;
import com.kakawanyifan.service.SMSCodeService;
import com.kakawanyifan.util.CodeUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class SMSCodeServiceImpl implements SMSCodeService {

@Autowired
private CodeUtil codeUtil;

@Override
public SMSCode sendCodeToSMS(String tel) {
return codeUtil.generator(tel);
}

@Override
public boolean checkCode(SMSCode smsCode) {
//取出内存中的验证码与传递过来的验证码比对,如果相同,返回true
String code = smsCode.getCode();
String cacheCode = codeUtil.get(smsCode.getTel()).getCode();
return code.equals(cacheCode);
}
}

验证码工具类,验证码的生成策略以及根据手机号读取验证码的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.kakawanyifan.util;

import com.kakawanyifan.pojo.SMSCode;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Component;

import java.text.DecimalFormat;

@Component
public class CodeUtil {

@CachePut(value = "smsCode", key = "#tel")
public SMSCode generator(String tel){
int num = (int) (Math.random() * 1000000);
DecimalFormat decimalFormat = new DecimalFormat("000000");
return new SMSCode(tel,decimalFormat.format(num));
}

@Cacheable(value = "smsCode",key="#tel")
public SMSCode get(String tel){
return null;
}
}

解释说明:在获取验证码的功能上不能使用@Cacheable注解,@Cacheable注解是缓存中没有值则放入值,缓存中有值则取值,此处的功能仅仅是生成验证码并放入缓存,并不具有从缓存中取值的功能,因此不能使用@Cacheable注解,应该使用仅具有向缓存中保存数据的功能,使用@CachePut注解。

验证码功能的表现层接口,一个方法用于获取验证码,一个方法用于进行校验。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.kakawanyifan.controller;

import com.kakawanyifan.pojo.SMSCode;
import com.kakawanyifan.service.SMSCodeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/sms")
@Slf4j
public class SMSCodeController {

@Autowired
private SMSCodeService smsCodeService;

@GetMapping
public SMSCode getCode(String tel){
return smsCodeService.sendCodeToSMS(tel);
}

@PostMapping
public boolean checkCode(SMSCode smsCode){
return smsCodeService.checkCode(smsCode);
}
}

我们可以按照如下的方式请求,进行测试。

获取验证码:

1
curl --location --request GET 'http://localhost:8080/sms?tel=123'

校验验证码:

1
2
3
4
curl --location --request POST 'http://localhost:8080/sms' \
--header 'Content-Type: application/x-www-form-urlencoded' \
--data-urlencode 'tel=123' \
--data-urlencode 'code=846140'

整合Ehcache

Ehcache是一种缓存技术,使用SpringBoot整合Ehcache其实就是变更一下缓存技术的实现方式。

导入Ehcache的坐标

1
2
3
4
<dependency>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
</dependency>

为什么不是导入Ehcache的starter,而是导入技术坐标呢?
其实SpringBoot整合缓存技术做的是通用格式,不管你整合哪种缓存技术,只是实现变化了,操作方式一样。
这也是为什么我们在上文导入缓存相关的启动器的时候,发现其名字带有"abstraction"。

配置缓存技术实现使用Ehcache

1
2
3
4
5
spring:
cache:
type: ehcache
ehcache:
config: ehcache.xml
  • ​配置缓存的类型typeehcache
  • ​由于ehcache的配置有独立的配置文件格式,因此还需要指定ehcache的配置文件,以便于读取相应配置。

ehcache.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<?xml version="1.0" encoding="UTF-8"?>
<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://ehcache.org/ehcache.xsd"
updateCheck="false">
<diskStore path="ehcache" />

<!--默认缓存策略 -->
<!-- external:是否永久存在,设置为true则不会被清除,此时与timeout冲突,通常设置为false-->
<!-- diskPersistent:是否启用磁盘持久化-->
<!-- maxElementsInMemory:最大缓存数量-->
<!-- overflowToDisk:超过最大缓存数量是否持久化到磁盘-->
<!-- timeToIdleSeconds:最大不活动间隔,设置过长缓存容易溢出,设置过短无效果,可用于记录时效性数据,例如验证码-->
<!-- timeToLiveSeconds:最大存活时间-->
<!-- memoryStoreEvictionPolicy:缓存清除策略-->

<defaultCache
eternal="false"
diskPersistent="true"
maxElementsInMemory="1000"
overflowToDisk="false"
timeToIdleSeconds="300"
timeToLiveSeconds="300"
memoryStoreEvictionPolicy="LRU" />

<cache
name="smsCode"
eternal="false"
diskPersistent="true"
maxElementsInMemory="1000"
overflowToDisk="false"
timeToIdleSeconds="300"
timeToLiveSeconds="300"
memoryStoreEvictionPolicy="LRU" />
</ehcache>

解释说明:在上文的案例中,我们设置了缓存保存的位置是smsCode,在这里需要ehcache中有一个缓存空间名称叫做smsCode

至此,SpringBoot整合Ehcache就已经完成了。
我们可以看到,原始代码没有任何修改,仅仅是加了一组配置:

  1. 导入Ehcache的坐标。
  2. 修改设置,配置缓存供应商为ehcache,并提供对应的缓存配置文件。

整合Redis

与上文使用Ehcache作为缓存的过程类似,加坐标,改缓存实现类型为ehcache。
使用redis做缓存,也是加坐标,改缓存实现类型为redis;差别之处只有一点,redis的配置可以在yml文件中直接进行配置,无需独立的配置文件。

导入redis的坐标

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置缓存技术实现使用redis

1
2
3
4
5
6
7
spring:
redis:
host: 10.211.55.14
port: 6379
password: Redis@2023
cache:
type: Redis

​如果需要对redis作为缓存进行配置,注意不是对原始的redis进行配置,而是配置redis作为缓存使用相关的配置,隶属于spring.cache.redis节点下。

1
2
3
4
5
6
7
8
9
10
11
12
spring:
redis:
host: 10.211.55.14
port: 6379
password: Redis@2023
cache:
type: Redis
redis:
use-key-prefix: false
key-prefix: sms_
cache-null-values: false
time-to-live: 300s

整合JetCache

JetCache是阿里推出的一套缓存方案,实现了多级缓存、缓存统计、自动刷新、异步调用、数据报表等功能。

对于其多级缓存,有本地缓存和远程缓存两个层级:

  • 本地缓存(Local)
    • LinkedHashMap
    • Caffeine
  • 远程缓存(Remote)
    • Redis
    • Tair

只用远程缓存(Redis)

jetcache对应的starter,当前坐标默认使用的远程方案是redis

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.alicp.jetcache/jetcache-starter-redis -->
<dependency>
<groupId>com.alicp.jetcache</groupId>
<artifactId>jetcache-starter-redis</artifactId>
<version>2.6.7</version>
</dependency>

上文的spring-boot-starter-cache,在本文暂时不需要。

注意JetCache的版本,如果版本过高,可能会有如下的报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
Caused by: java.lang.NoClassDefFoundError: redis/clients/jedis/UnifiedJedis
at com.alicp.jetcache.autoconfigure.RedisAutoConfiguration$RedisAutoInit.initCache(RedisAutoConfiguration.java:101) ~[jetcache-autoconfigure-2.7.3.jar:na]
at com.alicp.jetcache.autoconfigure.AbstractCacheAutoInit.process(AbstractCacheAutoInit.java:69) ~[jetcache-autoconfigure-2.7.3.jar:na]
at com.alicp.jetcache.autoconfigure.AbstractCacheAutoInit.afterPropertiesSet(AbstractCacheAutoInit.java:50) ~[jetcache-autoconfigure-2.7.3.jar:na]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863) ~[spring-beans-5.3.24.jar:5.3.24]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800) ~[spring-beans-5.3.24.jar:5.3.24]
... 68 common frames omitted
Caused by: java.lang.ClassNotFoundException: redis.clients.jedis.UnifiedJedis
at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[na:1.8.0_333]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_333]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[na:1.8.0_333]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_333]
... 73 common frames omitted

远程(Redis)方案基本配置

1
2
3
4
5
6
7
8
9
jetcache:
remote:
default:
type: redis
host: 10.211.55.14
port: 6379
password: Redis@2023
poolConfig:
maxTotal: 50
  • jetcache:spring:位于同一层级。
  • poolConfig必须配置,否则会报错。

启用缓存,在引导类上方使用注解@EnableCreateCacheAnnotation开启缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.kakawanyifan;

import com.alicp.jetcache.anno.config.EnableCreateCacheAnnotation;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.kakawanyifan.dao")
@EnableCreateCacheAnnotation
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}

创建缓存对象Cache,并使用注解@CreateCache标记当前缓存的信息,然后使用Cache对象的API操作缓存,put写缓存,get读缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.kakawanyifan.util;

import com.alicp.jetcache.Cache;
import com.alicp.jetcache.anno.CreateCache;
import com.kakawanyifan.pojo.SMSCode;
import org.springframework.stereotype.Component;

import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;

@Component
public class CodeUtil {

@CreateCache(name = "jetCache_",expire = 300,timeUnit = TimeUnit.SECONDS)
private Cache<String, SMSCode> jetCache;

public SMSCode generator(String tel){
int num = (int) (Math.random() * 1000000);
DecimalFormat decimalFormat = new DecimalFormat("000000");
String code = decimalFormat.format(num);
SMSCode smsCode = new SMSCode(tel,code);
jetCache.put(tel,smsCode);
return smsCode;
}

public SMSCode get(String tel){
return jetCache.get(tel);
}
}

在上文,我们使用的是配置中定义的"default"缓存,"default"只是一个名字,例如我们可以再添加一种缓存,参照如下配置进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
jetcache:
remote:
default:
type: redis
host: 10.211.55.14
port: 6379
password: Redis@2023
poolConfig:
maxTotal: 50
sms:
type: redis
host: 10.211.55.14
port: 6379
password: Redis@2023
poolConfig:
maxTotal: 50

如果想使用名称是sms的缓存,只需要在创建缓存时指定参数area,声明使用对应缓存。

1
2
@CreateCache(area="sms",name="jetCache_",expire = 10,timeUnit = TimeUnit.SECONDS)
private Cache<String ,String> smsJetCache;

只用本地缓存(LinkedHashMap)

第一步同样是导包,在这里我们同样是导入jetcache-starter-redis

在远程方案中,配置中使用remote表示远程,换成local就是本地,只不过类型不一样而已。

1
2
3
4
5
jetcache:
local:
default:
type: linkedhashmap
keyConvertor: fastjson

解释说明:为了加速数据获取时key的匹配速度,jetcache要求指定key的类型转换器。简单说就是,如果你给了一个Object作为key的话,要先用key的类型转换器给转换成字符串,然后再保存。等到获取数据时,先使用给定的Object转换成字符串,然后根据字符串匹配。由于jetcache是阿里的技术,这里推荐key的类型转换器使用阿里的fastjson。

@EnableCreateCacheAnnotation,启用缓存

创建缓存对象Cache时,标注当前使用本地缓存。

cacheType控制当前缓存使用本地缓存还是远程缓存:

  • cacheType=CacheType.LOCAL,使用本地缓存。
  • cacheType如果不进行配置,默认值是REMOTE,使用远程缓存。
1
@CreateCache(name = "jetCache_",expire = 300,timeUnit = TimeUnit.SECONDS,cacheType = CacheType.LOCAL)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.kakawanyifan.util;

import com.alicp.jetcache.Cache;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.CreateCache;
import com.kakawanyifan.pojo.SMSCode;
import org.springframework.stereotype.Component;

import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;

@Component
public class CodeUtil {

@CreateCache(name = "jetCache_",expire = 300,timeUnit = TimeUnit.SECONDS,cacheType = CacheType.LOCAL)
private Cache<String, SMSCode> jetCache;

public SMSCode generator(String tel){
int num = (int) (Math.random() * 1000000);
DecimalFormat decimalFormat = new DecimalFormat("000000");
String code = decimalFormat.format(num);
SMSCode smsCode = new SMSCode(tel,code);
jetCache.put(tel,smsCode);
return smsCode;
}

public SMSCode get(String tel){
return jetCache.get(tel);
}
}

使用本地缓存和远程缓存

本地和远程方法都有了,将两种配置合并到一起,就是本地+远程的方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
jetcache:
local:
default:
type: linkedhashmap
keyConvertor: fastjson
remote:
default:
type: redis
host: 10.211.55.14
port: 6379
password: Redis@2023
poolConfig:
maxTotal: 50

在创建缓存的时候,配置cacheType为BOTH即则本地缓存与远程缓存同时使用。

1
2
@CreateCache(name = "jetCache_",expire = 300,timeUnit = TimeUnit.SECONDS,cacheType = CacheType.BOTH)
private Cache<String, SMSCode> jetCache;

方法缓存

在上文讨论Spring中的缓存的代码中,我们只需要在相关的方法上添加缓存,这种被称为方法缓存。在JetCache中也有方法缓存。

第一步同样是导包,在这里我们同样是导入jetcache-starter-redis

配置缓存

本地和远程方法都有了,将两种配置合并到一起,就是本地+远程的方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
jetcache:
local:
default:
type: linkedhashmap
keyConvertor: fastjson
remote:
default:
type: redis
host: 10.211.55.14
port: 6379
password: Redis@2023
keyConvertor: fastjson
valueEncode: java
valueDecode: java
poolConfig:
maxTotal: 50

解释说明:
由于Redis缓存中不支持保存对象,因此需要对Redis设置当Object类型数据进入到Redis中时如何进行类型转换。

  • 配置keyConvertor表示key的类型转换方式
  • 配置value的转换类型方式,值进入redis时是java类型,标注valueEncode为java,值从redis中读取时转换成java,标注valueDecode为java。

​为了实现Object类型的值进出Redis,需要保障进出Redis的Object类型的数据必须实现序列化接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.kakawanyifan.pojo;

import java.io.Serializable;

public class Book implements Serializable {
private Integer id;
private String type;
private String name;
private String description;

【部分代码略】

}

启用缓存,开启方法缓存功能,并配置basePackages,说明在哪些包中开启方法缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.kakawanyifan;

import com.alicp.jetcache.anno.config.EnableCreateCacheAnnotation;
import com.alicp.jetcache.anno.config.EnableMethodCache;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.kakawanyifan.dao")
@EnableCreateCacheAnnotation
//开启方法注解缓存
@EnableMethodCache(basePackages = "com.kakawanyifan")
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}

使用注解@Cached标注当前方法使用缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.kakawanyifan.service.impl;

import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.Cached;
import com.kakawanyifan.dao.BookDao;
import com.kakawanyifan.pojo.Book;
import com.kakawanyifan.service.BookService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
@Transactional
@Service
public class BookServiceImpl implements BookService {
@Autowired
private BookDao bookDao;

【部分代码略】

@Cached(name="book_",key="#id",expire = 3600,cacheType = CacheType.REMOTE)
public Book getById(Integer id) {
return bookDao.getById(id);
}

【部分代码略】

}

远程方案的数据同步

​由于远程方案中Redis保存的数据可以被多个客户端共享,这就存在了数据同步问题。
JetCache提供了三个注解解决此问题,分别:“更新、删除操作时同步缓存数据”、“读取缓存时定时刷新数据”。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.kakawanyifan.service.impl;

import com.alicp.jetcache.anno.*;
import com.kakawanyifan.dao.BookDao;
import com.kakawanyifan.pojo.Book;
import com.kakawanyifan.service.BookService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
@Transactional
@Service
public class BookServiceImpl implements BookService {
@Autowired
private BookDao bookDao;



public boolean save(Book book) {
bookDao.save(book);
return true;
}

@CacheUpdate(name="book_",key="#book.id",value="#book")
public boolean update(Book book) {
bookDao.update(book);
return true;
}

@CacheInvalidate(name="book_",key = "#id")
public boolean delete(Integer id) {
bookDao.delete(id);
return true;
}

@Cached(name="book_",key="#id",expire = 3600,cacheType = CacheType.REMOTE)
@CacheRefresh(refresh = 5)
public Book getById(Integer id) {
return bookDao.getById(id);
}

public List<Book> getAll() {
return bookDao.getAll();
}
}

解释说明:

  • @CacheUpdate(name="book_",key="#book.id",value="#book"),更新缓存。
  • @CacheInvalidate(name="book_",key = "#id"),删除缓存。
  • @CacheRefresh(refresh = 5),定时刷新缓存。

数据报表

JetCache还提供有简单的数据报表功能,帮助开发者快速查看缓存命中信息,只需要添加一个配置即可:

1
2
jetcache:
statIntervalMinutes: 1

设置后,每1分钟在控制台输出缓存数据命中信息

1
2
3
4
5
6
7
8
2023-01-22 16:16:00.007  INFO 14468 --- [DefaultExecutor] c.alicp.jetcache.support.StatInfoLogger  : jetcache stat from 2023-01-22 16:15:12,528 to 2023-01-22 16:16:00,002
cache | qps| rate| get| hit| fail| expire|avgLoadTime|maxLoadTime
------------------------+----------+-------+--------------+--------------+--------------+--------------+-----------+-----------
default_book_ | 0.75| 96.77%| 31| 30| 0| 0| 63.3| 356
default_jetCache_ | 0.00| 0.00%| 0| 0| 0| 0| 0.0| 0
default_jetCache__local | 0.00| 0.00%| 0| 0| 0| 0| 0.0| 0
default_jetCache__remote| 0.00| 0.00%| 0| 0| 0| 0| 0.0| 0
------------------------+----------+-------+--------------+--------------+--------------+--------------+-----------+-----------

JetCache的配置

application.yml中,JetCache的配置有:

属性 默认值 说明
jetcache.statIntervalMinutes 0 统计间隔,0表示不统计
jetcache.hiddenPackages false 自动生成name时,隐藏指定的包名前缀。false,不隐藏。
jetcache.[local|remote].${area}.type 缓存类型,本地支持linkedhashmap、caffeine,远程支持redis、tair
jetcache.[local|remote].${area}.keyConvertor key转换器,当前仅支持fastjson
jetcache.[local|remote].${area}.valueEncoder Java 仅remote类型的缓存需要指定,可选java和kryo
jetcache.[local|remote].${area}.valueDecoder Java 仅remote类型的缓存需要指定,可选java和kryo
jetcache.[local|remote].${area}.limit 100 仅local类型的缓存需要指定,缓存实例最大元素数
jetcache.[local|remote].${area}.expireAfterWriteInMillis 无穷大 默认过期时间,毫秒单位

定时任务

整合Quartz

关于Quartz,可以参考我们在《19.Quartz和APScheduler》的讨论。
本文主要讨论SpringBoot整合Quartz。

导入SpringBoot整合Quartz的starter。

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

定义任务Bean,继承QuartzJobBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.kakawanyifan.job;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.time.LocalDateTime;

public class MyQuartzJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
System.out.println(LocalDateTime.now());
}
}

创建Quartz配置类,定义工作明细(JobDetail)与触发器的(Trigger)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.kakawanyifan.config;

import com.kakawanyifan.job.MyQuartzJob;
import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QuartzConfig {
@Bean
public JobDetail printJobDetail(){
//绑定具体的工作
return JobBuilder.newJob(MyQuartzJob.class).storeDurably().build();
}

@Bean
public Trigger printJobTrigger(){
ScheduleBuilder schedBuilder = CronScheduleBuilder.cronSchedule("0/5 * * * * ?");
//绑定对应的工作明细
return TriggerBuilder.newTrigger().forJob(printJobDetail()).withSchedule(schedBuilder).build();
}
}

解释说明:
SpringBoot整合Quartz就是将Quartz对应的核心对象交给Spring容器管理,包含两个对象,JobDetail和Trigger对象。

  • ​JobDetail工作明细中要设置对应的具体工作,使用newJob()操作传入对应的工作任务类型即可。
  • Trigger​触发器需要绑定任务,使用forJob()操作传入绑定的工作明细对象。触发器中最核心的规则是执行时间,此处使用调度器定义执行时间,执行时间描述方式使用的是cron表达式。

启动Spring项目,运行结果:

1
2
3
4
5
2023-01-12T14:40:35.025
2023-01-12T14:40:40.004
2023-01-12T14:40:45.004
2023-01-12T14:40:50.015
2023-01-12T14:40:55.003

Task

除了Quartz,Spring中也自带了定时任务。

关于该部分,我们在《19.Quartz和APScheduler》也有过讨论,本文主要讨论在SpringBoot中的定时任务。主要区别在于

  • 在Spring中,@EnableScheduling注解在配置类上。
  • 在SpringBoot中,@EnableScheduling在引导类上。

开启定时任务功能,在引导类上开启定时任务功能的开关,使用注解@EnableScheduling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.kakawanyifan;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}

定义Bean,在对应要定时执行的操作上方,使用注解@Scheduled定义执行的时间,执行时间的描述方式还是cron表达式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.kakawanyifan.job;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
public class MyTaskJob {

@Scheduled(cron = "0/5 * * * * ?")
public void print(){
System.out.println(LocalDateTime.now());
}
}

运行结果:

1
2
3
4
5
2023-01-12T14:46:15.011
2023-01-12T14:46:20.035
2023-01-12T14:46:25.015
2023-01-12T14:46:30.002
2023-01-12T14:46:35.005

​如何想对定时任务进行相关配置,可以通过配置文件进行。

1
2
3
4
5
6
7
8
9
spring:
task:
scheduling:
pool:
size: 1 # 任务调度线程池大小 默认 1
thread-name-prefix: ssm_ # 调度线程名称前缀 默认 scheduling-
shutdown:
await-termination: false # 线程池关闭时等待所有任务完成
await-termination-period: 10s # 调度线程关闭前最大等待时间,确保最后一定关闭

邮件

标准规范

邮件协议

常见的邮件协议有:

  • SMTP(Simple Mail Transfer Protocol),简单邮件传输协议,发邮件的标准。
  • POP3(Post Office Protocol - Version 3),邮局协议版本3,收邮件的标准。
  • IMAP(Internet Mail Access Protocol),Internet邮件访问协议,对POP3的升级。

在本文,我们主要讨论如何发邮件。

MIME

MIME(Multipurpose Internet Mail Extensions),多用途因特网邮件扩展标准。这个不是邮件传输协议,是对传输内容、附件等定义了格式。

JavaMail发邮件

邮件发送过程

整个发邮件的过程如下:

  1. 创建一个Session对象。
  2. Session对象创建一个Transport对象,用来发送邮件。
  3. Transport对象连接邮件服务器。
  4. Transport对象创建一个(Message对象)。
  5. Transport对象发送邮件。

上述过程简略后,就三步:

  1. 创建发送邮件的对象
  2. 创建邮件对象
  3. 发送邮件的对象去发送邮件对象

《基于Python的后端开发入门:3.拷贝、类型注解、闭包和一些常用的包》,我们讨论了如何用Python发送邮件,也是上述步骤。

javax.mail

导入包javax.mail

1
2
3
4
5
<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
<version>1.6.2</version>
</dependency>

发送文本邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.kakawanyifan;

import com.sun.mail.util.MailSSLSocketFactory;

import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.security.GeneralSecurityException;
import java.util.Properties;


public class MailSend {
public static void main(String[] args) throws GeneralSecurityException, MessagingException {

Properties properties = new Properties();
// 开启debug调试
properties.setProperty("mail.debug", "true");
// 发送服务器需要身份验证
properties.setProperty("mail.smtp.auth", "true");
// 设置邮件服务器主机名
properties.setProperty("mail.host", "smtp.qq.com");
// 发送邮件协议名称
properties.setProperty("mail.transport.protocol", "smtp");

MailSSLSocketFactory mailSSLSocketFactory = new MailSSLSocketFactory();
mailSSLSocketFactory.setTrustAllHosts(true);

properties.put("mail.smtp.ssl.enable", "true");
properties.put("mail.smtp.ssl.socketFactory", mailSSLSocketFactory);

// 创建session
Session session = Session.getInstance(properties);

Transport transport = session.getTransport();
// 连上邮件服务器
transport.connect("smtp.qq.com", "【发件人邮箱】", "【发件人密码】");

// 创建邮件
MimeMessage message = new MimeMessage(session);

// 邮件消息头
// 邮件的发件人
message.setFrom(new InternetAddress("【发件人邮箱】"));
// 邮件的收件人
message.setRecipient(Message.RecipientType.TO, new InternetAddress("i@m.kakawanyifan.com"));
// 邮件的抄送人
message.setRecipient(Message.RecipientType.CC, new InternetAddress("i@m.kakawanyifan.com"));
// 邮件的密送人
message.setRecipient(Message.RecipientType.BCC, new InternetAddress("i@m.kakawanyifan.com"));
// 邮件的标题
message.setSubject("测试文本邮件-标题");

// 邮件消息体
message.setText("测试文本邮件-内容");

// 发送邮件
transport.sendMessage(message, message.getAllRecipients());
transport.close();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
DEBUG: JavaMail version 1.6.2
DEBUG: successfully loaded resource: /META-INF/javamail.default.address.map
DEBUG: getProvider() returning javax.mail.Provider[TRANSPORT,smtp,com.sun.mail.smtp.SMTPTransport,Oracle]
DEBUG SMTP: useEhlo true, useAuth true
DEBUG SMTP: trying to connect to host "smtp.qq.com", port 465, isSSL true

【部分运行结果略】

.
250 OK: queued as.
DEBUG SMTP: message successfully delivered to mail server
QUIT
221 Bye.

Process finished with exit code 0

解释说明:

  • java.util.Properties,这个类我们在《5.IO流》有过讨论。
    在本文,我们用于设置发邮件相关的属性有:
    • mail.host:发送、接收邮件的默认邮箱服务器。
    • mail.store.protocol:接收邮件的协议。
    • mail.transport.protocol:发送邮件的协议。
    • mail.debug.auth:是否开启DEBUG调试。
    • mail.smtp.auth:SMPT是否需要身份验证。
  • javax.mail.Session:邮件会话。
  • javax.mail.Transport:邮件传输(发送)。
  • javax.mail.Store:邮件存储(接收)。
  • javax.mail.Message,这是一个抽象类,一般我们用的是其实现类javax.mail.internet.MimeMessage。相关方法有:
    • setFrom:设置邮件的发件人。
    • setRecipient:设置邮件的收件人(Message.RecipientType.TO)、抄送人(Message.RecipientType.CC)和密送人(Message.RecipientType.BCC)。
    • setSubject:设置邮件的主题。
    • setContent:设置邮件内容。
    • setText:设置邮件的文本内容(纯文本)。
  • javax.mail.Address:也是一个抽象类,一般我们用的是其实现类javax.mail.internet.InternetAddress

发送HTML邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.kakawanyifan;

import com.sun.mail.util.MailSSLSocketFactory;

import javax.activation.DataHandler;
import javax.activation.FileDataSource;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;
import java.security.GeneralSecurityException;
import java.util.Properties;


public class MailSend {
public static void main(String[] args) throws GeneralSecurityException, MessagingException {

Properties properties = new Properties();
properties.setProperty("mail.debug", "true");
properties.setProperty("mail.smtp.auth", "true");
properties.setProperty("mail.host", "smtp.qq.com");
properties.setProperty("mail.transport.protocol", "smtp");
MailSSLSocketFactory mailSSLSocketFactory = new MailSSLSocketFactory();
mailSSLSocketFactory.setTrustAllHosts(true);
properties.put("mail.smtp.ssl.enable", "true");
properties.put("mail.smtp.ssl.socketFactory", mailSSLSocketFactory);

Session session = Session.getInstance(properties);
Transport transport = session.getTransport();
transport.connect("smtp.qq.com", "【发件人邮箱】", "【发件人密码】");

// 创建邮件
MimeMessage message = new MimeMessage(session);

message.setFrom(new InternetAddress("【发件人邮箱】"));
message.setRecipient(Message.RecipientType.TO, new InternetAddress("i@m.kakawanyifan.com"));
message.setRecipient(Message.RecipientType.CC, new InternetAddress("i@m.kakawanyifan.com"));
message.setRecipient(Message.RecipientType.BCC, new InternetAddress("i@m.kakawanyifan.com"));
message.setSubject("测试HTML邮件-标题");

String htmlContent = "<h1>Hello</h1>" + "<p>显示图片<img src='cid:abc.jpg'>1.jpg</p>";

MimeBodyPart text = new MimeBodyPart();
text.setContent(htmlContent, "text/html;charset=UTF-8");

MimeBodyPart image = new MimeBodyPart();
DataHandler dh = new DataHandler(new FileDataSource("/Users/kaka/Desktop/001.jpg"));
image.setDataHandler(dh);
image.setContentID("abc.jpg");

// 描述数据关系
MimeMultipart mm = new MimeMultipart();
mm.addBodyPart(text);
mm.addBodyPart(image);
mm.setSubType("related");
message.setContent(mm);
message.saveChanges();

transport.sendMessage(message, message.getAllRecipients());
transport.close();
}
}

解释说明:

  • MimeMessage:代表整封邮件。
  • MimeBodyPart:代表一个MIME信息。
  • MimeMultipart:代表一个由多个MIME信息组合成的组合MIME信息。
  • 如果没有mm.setSubType("related");,在某些邮件客户端,图片可能会同时以HTML和附件的形式存在。

发送带附件的邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.kakawanyifan;

import com.sun.mail.util.MailSSLSocketFactory;

import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Properties;


public class MailSend {
public static void main(String[] args) throws GeneralSecurityException, MessagingException, IOException {

Properties properties = new Properties();
properties.setProperty("mail.debug", "true");
properties.setProperty("mail.smtp.auth", "true");
properties.setProperty("mail.host", "smtp.qq.com");
properties.setProperty("mail.transport.protocol", "smtp");
MailSSLSocketFactory mailSSLSocketFactory = new MailSSLSocketFactory();
mailSSLSocketFactory.setTrustAllHosts(true);
properties.put("mail.smtp.ssl.enable", "true");
properties.put("mail.smtp.ssl.socketFactory", mailSSLSocketFactory);

Session session = Session.getInstance(properties);
Transport transport = session.getTransport();
transport.connect("smtp.qq.com", "【发件人邮箱】", "【发件人密码】");

// 创建邮件
MimeMessage message = new MimeMessage(session);

message.setFrom(new InternetAddress("【发件人邮箱】"));
message.setRecipient(Message.RecipientType.TO, new InternetAddress("i@m.kakawanyifan.com"));
message.setRecipient(Message.RecipientType.CC, new InternetAddress("i@m.kakawanyifan.com"));
message.setRecipient(Message.RecipientType.BCC, new InternetAddress("i@m.kakawanyifan.com"));
message.setSubject("测试带附件的邮件-标题");

MimeBodyPart text = new MimeBodyPart();
text.setContent("邮件中有两个附件。", "text/html;charset=UTF-8");

// 描述数据关系
MimeMultipart mm = new MimeMultipart();
mm.setSubType("related");
mm.addBodyPart(text);

MimeBodyPart attachPart1 = new MimeBodyPart();
attachPart1.attachFile("/Users/kaka/Desktop/001.jpg");
mm.addBodyPart(attachPart1);

MimeBodyPart attachPart2 = new MimeBodyPart();
attachPart2.attachFile("/Users/kaka/Desktop/002.png");
mm.addBodyPart(attachPart2);

message.setContent(mm);
message.saveChanges();

transport.sendMessage(message, message.getAllRecipients());
transport.close();
}
}

整合JavaMail

配置

导入SpringBoot整合JavaMail的Starter

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>

配置邮箱的登录信息

1
2
3
4
5
spring:
mail:
host: smtp.qq.com
username: 【发件人邮箱】
password: 【发件人密码】

发送简单邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.kakawanyifan.email;

import com.kakawanyifan.Application;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;

@SpringBootTest(classes = Application.class)
public class SendMailTest {

@Autowired
private JavaMailSender javaMailSender;

//发送人
private String from = "【发件人邮箱】";
//接收人
private String to = "i@m.kakawanyifan.com";
//标题
private String subject = "测试邮件";
//正文
private String context = "测试邮件正文内容";

@Test
public void sendMail() {
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom(from);
message.setTo(to);
message.setSubject(subject);
message.setText(context);
javaMailSender.send(message);
}

}

发送HTML邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package com.kakawanyifan.email;

import com.kakawanyifan.Application;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;

import javax.activation.DataHandler;
import javax.activation.FileDataSource;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

@SpringBootTest(classes = Application.class)
public class SendMailTest {

@Autowired
private JavaMailSender javaMailSender;

//发送人
private String from = "【发件人邮箱】";
//接收人
private String to = "i@m.kakawanyifan.com";
//标题
private String subject = "测试邮件";

@Test
public void sendMail() throws Exception {
MimeMessage mimeMailMessage = javaMailSender.createMimeMessage();
//true 表示需要创建一个multipart message
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMailMessage, true);
//发件人
mimeMessageHelper.setFrom(from);
//邮件接收人
mimeMessageHelper.setTo(to);
//邮件主题
mimeMessageHelper.setSubject(subject);

MimeMultipart allMultipart = new MimeMultipart();

//创建代表邮件正文和附件的各个MimeBodyPart对象
StringBuilder sb = new StringBuilder();
sb.append("<h2>SpirngBoot测试邮件HTML</h2>")
.append("<p style='text-align:left'>这是一封HTML邮件...</p>")
.append("<a href=http://www.baidu.com>点击进入百度</a><br/>")
//内嵌图片
.append("<img src=\"cid:a00000001\"><br/><br/>")
.append("<img src=\"cid:a00000002\">");
String content = sb.toString();

//内嵌了多少张图片,如果没有,则new一个不带值的Map
Map<String,String> image = new HashMap<>();
image.put("a00000001", "/Users/kaka/Desktop/001.jpg");
image.put("a00000002", "/Users/kaka/Desktop/002.png");

MimeBodyPart contentPart = createContent(content, image);
allMultipart.addBodyPart(contentPart);

//设置整个邮件内容为最终组合出的MimeMultipart对象
mimeMailMessage.setContent(allMultipart);
mimeMailMessage.saveChanges();

javaMailSender.send(mimeMailMessage);

}

public MimeBodyPart createContent(String body, Map<String, String> map) throws Exception {
//创建代表组合Mime消息的MimeMultipart对象,将该MimeMultipart对象保存到MimeBodyPart对象
MimeBodyPart contentPart = new MimeBodyPart();
MimeMultipart contentMultipart = new MimeMultipart("related");

//创建用于保存HTML正文的MimeBodyPart对象,并将它保存到MimeMultipart中
MimeBodyPart htmlBodyPart = new MimeBodyPart();
htmlBodyPart.setContent(body, "text/html;charset=UTF-8");
contentMultipart.addBodyPart(htmlBodyPart);

if (map != null && map.size() > 0) {
Set<Map.Entry<String, String>> set = map.entrySet();
for (Map.Entry<String, String> entry : set) {
//创建用于保存图片的MimeBodyPart对象,并将它保存到MimeMultipart中
MimeBodyPart gifBodyPart = new MimeBodyPart();
FileDataSource fds = new FileDataSource(entry.getValue());//图片所在的目录的绝对路径

gifBodyPart.setDataHandler(new DataHandler(fds));
gifBodyPart.setContentID(entry.getKey()); //cid的值
contentMultipart.addBodyPart(gifBodyPart);
}
}

//将MimeMultipart对象保存到MimeBodyPart对象
contentPart.setContent(contentMultipart);
return contentPart;
}


}

发送带附件的邮件

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.kakawanyifan.email;

import com.kakawanyifan.Application;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;

import javax.activation.DataHandler;
import javax.activation.FileDataSource;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;

@SpringBootTest(classes = Application.class)
public class SendMailTest {

@Autowired
private JavaMailSender javaMailSender;

//发送人
private String from = "【发件人邮箱】";
//接收人
private String to = "i@m.kakawanyifan.com";
//标题
private String subject = "测试邮件";

@Test
public void sendMail() throws Exception {
MimeMessage mimeMailMessage = javaMailSender.createMimeMessage();
//true 表示需要创建一个multipart message
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMailMessage, true);
//发件人
mimeMessageHelper.setFrom(from);
//邮件接收人
mimeMessageHelper.setTo(to);
//邮件主题
mimeMessageHelper.setSubject(subject);

MimeMultipart allMultipart = new MimeMultipart();

MimeBodyPart contentPart = new MimeBodyPart();
contentPart.setText("附件");
allMultipart.addBodyPart(contentPart);

allMultipart.addBodyPart(createAttachment("/Users/kaka/Desktop/001.jpg"));
allMultipart.addBodyPart(createAttachment("/Users/kaka/Desktop/002.png"));

//设置整个邮件内容为最终组合出的MimeMultipart对象
mimeMailMessage.setContent(allMultipart);
mimeMailMessage.saveChanges();

javaMailSender.send(mimeMailMessage);

}

public MimeBodyPart createAttachment(String filename) throws Exception {
//创建保存附件的MimeBodyPart对象,并加入附件内容和相应的信息
MimeBodyPart attachPart = new MimeBodyPart();
FileDataSource fsd = new FileDataSource(filename);
attachPart.setDataHandler(new DataHandler(fsd));
attachPart.setFileName(fsd.getName());
return attachPart;
}


}

工具类

MailInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.kakawanyifan.pojo;

import lombok.Data;

import java.util.Map;

@Data
public class MailInfo {
// 邮件接收人
private String[] receiver;
// 邮件主题
private String subject;
// 邮件的文本内容
private String content;
// 抄送人
private String[] cc;
// 邮件附件的文件名
private String[] attachFileNames;
// 邮件内容内嵌图片
private Map<String,String> imageMap;
}

SendMailService

1
2
3
4
5
6
7
8
9
10
11
12
package com.kakawanyifan.service;

import com.kakawanyifan.pojo.MailInfo;

public interface SendMailService {

void sendSimpleTextEmail(MailInfo mailInfo);

void sendHtmlEmail(MailInfo mailInfo,boolean html);

void sendEnclosureEmail(MailInfo mailInfo);
}

SendMailServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package com.kakawanyifan.service.impl;

import com.kakawanyifan.pojo.MailInfo;
import com.kakawanyifan.service.SendMailService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Component;

import javax.activation.DataHandler;
import javax.activation.FileDataSource;
import javax.annotation.Resource;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;
import java.util.*;

@Slf4j
@Component
public class SendMailServiceImpl implements SendMailService {

@Resource
private JavaMailSender javaMailSender;

@Value(value = "${spring.mail.username}")
private String emailFrom;


@Override
public void sendSimpleTextEmail(MailInfo mailInfo) {
try {
SimpleMailMessage mailMessage = new SimpleMailMessage();
//发件人
mailMessage.setFrom(emailFrom);
//接收人
mailMessage.setTo(mailInfo.getReceiver());
//邮件主题
mailMessage.setSubject(mailInfo.getSubject());
//邮件抄送
mailMessage.setCc(mailInfo.getCc());
//邮件内容
mailMessage.setText(mailInfo.getContent());
//发送邮件
javaMailSender.send(mailMessage);
} catch (Exception e) {
log.error("邮件发送失败:{}", e.getMessage());
}
}

@Override
public void sendHtmlEmail(MailInfo mailInfo,boolean html) {
try {
MimeMessage mimeMailMessage = javaMailSender.createMimeMessage();
//true 表示需要创建一个multipart message
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMailMessage);
//发件人
mimeMessageHelper.setFrom(emailFrom);
//接收人
mimeMessageHelper.setTo(mailInfo.getReceiver());
//邮件主题
mimeMessageHelper.setSubject(mailInfo.getSubject());
//邮件抄送
mimeMessageHelper.setCc(mailInfo.getCc());
//邮件内容
mimeMessageHelper.setText(mailInfo.getContent(), html);
javaMailSender.send(mimeMailMessage);
} catch (Exception e) {
log.error("邮件发送失败:{}", e.getMessage());
}
}


@Override
public void sendEnclosureEmail(MailInfo mailInfo) {
try {
MimeMessage mimeMailMessage = javaMailSender.createMimeMessage();
//true 表示需要创建一个multipart message
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMailMessage, true);
//发件人
mimeMessageHelper.setFrom(emailFrom);
//邮件接收人
mimeMessageHelper.setTo(mailInfo.getReceiver());
//邮件主题
mimeMessageHelper.setSubject(mailInfo.getSubject());
//邮件抄送
mimeMessageHelper.setCc(mailInfo.getCc());
//设置显示的发件时间
mimeMessageHelper.setSentDate(new Date());

MimeMultipart allMultipart = new MimeMultipart();

//创建代表邮件正文和附件的各个MimeBodyPart对象
MimeBodyPart contentPart = createContent(mailInfo.getContent(), mailInfo.getImageMap());
allMultipart.addBodyPart(contentPart);

//创建用于组合邮件正文和附件的MimeMultipart对象
for (int i = 0; i < mailInfo.getAttachFileNames().length; i++) {
allMultipart.addBodyPart(createAttachment(mailInfo.getAttachFileNames()[i]));
}

//设置整个邮件内容为最终组合出的MimeMultipart对象
mimeMailMessage.setContent(allMultipart);
mimeMailMessage.saveChanges();

javaMailSender.send(mimeMailMessage);
} catch (Exception e) {
log.error("邮件发送失败:{}", e.getMessage());
}
}

public MimeBodyPart createAttachment(String filename) throws Exception {
//创建保存附件的MimeBodyPart对象,并加入附件内容和相应的信息
MimeBodyPart attachPart = new MimeBodyPart();
FileDataSource fsd = new FileDataSource(filename);
attachPart.setDataHandler(new DataHandler(fsd));
attachPart.setFileName(fsd.getName());
return attachPart;
}

public MimeBodyPart createContent(String body) throws Exception {
//创建代表组合Mime消息的MimeMultipart对象,将该MimeMultipart对象保存到MimeBodyPart对象
MimeBodyPart contentPart = new MimeBodyPart();
MimeMultipart contentMultipart = new MimeMultipart("related");

//创建用于保存HTML正文的MimeBodyPart对象,并将它保存到MimeMultipart中
MimeBodyPart htmlBodyPart = new MimeBodyPart();
htmlBodyPart.setContent(body, "text/html;charset=UTF-8");
contentMultipart.addBodyPart(htmlBodyPart);

//将MimeMultipart对象保存到MimeBodyPart对象
contentPart.setContent(contentMultipart);
return contentPart;
}

public MimeBodyPart createContent(String body, Map<String, String> map) throws Exception {
//创建代表组合Mime消息的MimeMultipart对象,将该MimeMultipart对象保存到MimeBodyPart对象
MimeBodyPart contentPart = new MimeBodyPart();
MimeMultipart contentMultipart = new MimeMultipart("related");

//创建用于保存HTML正文的MimeBodyPart对象,并将它保存到MimeMultipart中
MimeBodyPart htmlBodyPart = new MimeBodyPart();
htmlBodyPart.setContent(body, "text/html;charset=UTF-8");
contentMultipart.addBodyPart(htmlBodyPart);

if (map != null && map.size() > 0) {
Set<Map.Entry<String, String>> set = map.entrySet();
for (Map.Entry<String, String> entry : set) {
//创建用于保存图片的MimeBodyPart对象,并将它保存到MimeMultipart中
MimeBodyPart gifBodyPart = new MimeBodyPart();
FileDataSource fds = new FileDataSource(entry.getValue());//图片所在的目录的绝对路径

gifBodyPart.setDataHandler(new DataHandler(fds));
gifBodyPart.setContentID(entry.getKey()); //cid的值
contentMultipart.addBodyPart(gifBodyPart);
}
}

//将MimeMultipart对象保存到MimeBodyPart对象
contentPart.setContent(contentMultipart);
return contentPart;
}
}

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.kakawanyifan.email;

import com.kakawanyifan.Application;
import com.kakawanyifan.pojo.MailInfo;
import com.kakawanyifan.service.impl.SendMailServiceImpl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@SpringBootTest(classes = Application.class)
public class SendMailServiceTest {

@Autowired
SendMailServiceImpl emailSendMsgHandle;

@Test
void sendSimpleTextMail() {
MailInfo mailInfo = new MailInfo();
mailInfo.setReceiver(new String[]{"i@m.kakawanyifan.com"});
mailInfo.setSubject("测试主题");
mailInfo.setContent("邮件内容");
emailSendMsgHandle.sendSimpleTextEmail(mailInfo);
}

@Test
public void sendHTMLMail() {
MailInfo mailBean = new MailInfo();
//接收人
mailBean.setReceiver(new String[]{"i@m.kakawanyifan.com"});
mailBean.setSubject("SpringBootMailHTML之这是一封HTML格式的邮件");
mailBean.setCc(new String[]{"i@m.kakawanyifan.com"});
StringBuilder sb = new StringBuilder();
sb.append("<h2>SpirngBoot测试邮件HTML</h2>")
.append("<p style='text-align:left'>这是一封HTML邮件...</p>")
.append("<p> 时间为:"+ new Date() +"</p>");
mailBean.setContent(sb.toString());
//true、false控制以普通文本发送还是以html格式发送
emailSendMsgHandle.sendHtmlEmail(mailBean,true);
}

@Test
void sendAttachMail() {
MailInfo mailBean = new MailInfo();
mailBean.setReceiver(new String[]{"i@m.kakawanyifan.com"});
mailBean.setSubject("SpringBootMailHTML之这是一封HTML格式的邮件");
mailBean.setCc(new String[]{"i@m.kakawanyifan.com"});
StringBuilder sb = new StringBuilder();
sb.append("<h2>SpirngBoot测试邮件HTML</h2>")
.append("<p style='text-align:left'>这是一封HTML邮件...</p>")
.append("<p> 时间为:"+ new Date() +"</p>");
mailBean.setContent(sb.toString());
mailBean.setAttachFileNames(new String[]{"/Users/kaka/Desktop/001.jpg","/Users/kaka/Desktop/002.png"});
emailSendMsgHandle.sendEnclosureEmail(mailBean);
}

@Test
void sendHTMLMailWithImgAndAttach() {
MailInfo mailBean = new MailInfo();
mailBean.setReceiver(new String[]{"i@m.kakawanyifan.com"});
mailBean.setSubject("SpringBootMailHTML之这是一封HTML格式的邮件");
mailBean.setCc(new String[]{"i@m.kakawanyifan.com"});
StringBuilder sb = new StringBuilder();
sb.append("<h2>SpirngBoot测试邮件HTML</h2>")
.append("<p style='text-align:left'>这是一封HTML邮件...</p>")
.append("<a href=http://www.baidu.com>点击进入百度</a><br/>")
//内嵌图片
.append("<img src=\"cid:a00000001\"><br/><br/>")
.append("<img src=\"cid:a00000002\">")
.append("<p> 时间为:"+ new Date() +"</p>");
mailBean.setContent(sb.toString());
mailBean.setAttachFileNames(new String[]{"/Users/kaka/Desktop/001.jpg","/Users/kaka/Desktop/002.png"});
//内嵌了多少张图片,如果没有,则new一个不带值的Map
Map<String,String> image = new HashMap<>();
image.put("a00000001", "/Users/kaka/Desktop/001.jpg");
image.put("a00000002", "/Users/kaka/Desktop/002.png");


mailBean.setImageMap(image);
emailSendMsgHandle.sendEnclosureEmail(mailBean);
}
}

消息队列

概念

在计算机的语境下,消息除了具有数据的特征之外,还有消息的来源与接收着两个概念。通常发送消息的一方称为消息的生产者,接收消息的一方称为消息的消费者。

​对于消息的生产者与消费者的工作模式,还可以将消息划分成两种模式,同步消费与异步消息。​所谓同步消息就是生产者发送完消息,等待消费者处理,消费者处理完将结果告知生产者,然后生产者继续向下执行业务。这种模式过于卡生产者的业务执行连续性,在现在的企业级开发中,上述这种业务场景通常不会采用消息的形式进行处理。

​所谓异步消息就是生产者发送完消息,无需等待消费者处理完毕,生产者继续向下执行其他动作。比如生产者发送了一个日志信息给日志系统,发送过去以后生产者就向下做其他事情了,无需关注日志系统的执行结果。日志系统根据接收到的日志信息继续进行业务执行,是单纯的记录日志,还是记录日志并报警,这些和生产者无关,这样生产者的业务执行效率就会大幅度提升。并且可以通过添加多个消费者来处理同一个生产者发送的消息来提高系统的高并发性,改善系统工作效率,提高用户体验。一旦某一个消费者由于各种问题宕机了,也不会对业务产生影响,提高了系统的高可用性。

标准规范

​目前企业级开发中广泛使用的消息处理技术共三大类,具体如下:

  • JMS
  • AMQP
  • MQTT

JMS

概述

​JMS,Java Message Service。

对JMS规范实现的消息中间件技术有:ActiveMQ、Redis和HornetMQ等。但是也有一些不太规范的实现,参考JMS的标准设计,但是又不完全满足其规范,例如:RabbitMQ、RocketMQ。

有些资料会说,基于JMS规范的消息队列,无法在非Java的语言中用。其实不是这样的,例如ActiveMQ中就有各种跨语言的客户端。具体可以参考:

JMS消息模型

​JMS规范了消息有两种模型:

  • peer-2-peer,点对点模型。生产者会将消息发送到一个保存消息的容器(队列)中;一个队列的消息只能被一个消费者消费,或未被及时消费导致超时。这种模型下,生产者和消费者是一对一绑定的。
  • publish-subscribe,发布订阅模型。生产者将消息发送到一个保存消息的容器(队列)中;消息可以被多个消费者消费,生产者和消费者完全独立。

JMS消息种类

根据消息中包含的数据种类划分,可以将消息划分成6种消息。

  • TextMessage
  • MapMessage
  • BytesMessage
  • StreamMessage
  • ObjectMessage
  • Message(只有消息头和属性)

其中,最常用的是BytesMessage

AMQP

概述

​AMQP,advanced message queuing protocol,高级消息队列协议,规范了网络交换的数据格式,兼容JMS操作。

其优点是跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现。

目前实现了AMQP协议的消息中间件技术有:RabbitMQ、RocketMQ和StormMQ。

AMQP消息种类

AMQP消息种类只有一种: byte[]

这么设计的原因,是为了跨平台。

AMQP消息模型

AMQP在JMS的消息模型基础上又进行了进一步的扩展,除了点对点和发布订阅的模型,还有了几种全新的消息模型:

  • direct exchange
  • fanout exchange
  • topic exchange
  • headers exchange
  • system exchange

MQTT

​MQTT,Message Queueing Telemetry Transport,消息队列遥测传输,专为小设备设计,是物联网生态系统中主要成分之一。在本文,我们不作讨论。

KafKa

​Kafka,一种高吞吐量的分布式发布订阅消息系统,提供实时消息功能。Kafka技术并不是作为消息中间件为主要功能的产品,但是其拥有发布订阅的工作模式,也可以充当消息中间件来使用,而且目前企业级开发中其身影也不少见。

关于Kafka,更多的内容,可以参考《分布式事件流平台Kafka》

案例(购物订单发送手机短信)

需求

接下来,我们以购物订单发送手机短信为例,讨论各种各样的消息中间件技术。

需求如下:

  • 执行下单业务时,调用消息服务,将要发送短信的订单ID传递给消息中间件
  • 消息处理服务接收到要发送的订单ID后输出订单ID(模拟发短信)

订单业务

业务层接口:

1
2
3
4
5
package com.kakawanyifan.service;

public interface OrderService {
void order(String id);
}

业务层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.kakawanyifan.service.impl;

import com.kakawanyifan.service.MessageService;
import com.kakawanyifan.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class OrderServiceImpl implements OrderService {

@Autowired
MessageService messageService;

@Override
public void order(String id) {
// 一系列的操作
log.info("订单处理开始");
// 一系列的操作
// 短信处理
messageService.sendMessage(id);
log.info("订单处理结束");
}
}

表现层:

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.kakawanyifan.controller;

import com.kakawanyifan.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("orders/")
public class OrderController {

@Autowired
OrderService orderService;

@PostMapping("{id}")
public void order(@PathVariable String id){
orderService.order(id);
}
}

短信处理业务

业务层接口:

示例代码:

1
2
3
4
5
6
7
8
9
10
11
package com.kakawanyifan.service;

public interface MessageService {

// 发到消息队列
void sendMessage(String id);

// 消费
String doMessage();

}

解释说明:短信处理业务层接口提供两个操作,发送要处理的订单ID到消息中间件,另一个操作目前暂且设计成处理消息,实际消息的处理过程不应该是手动执行,应该是自动执行。

业务层实现:

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.kakawanyifan.service.impl;

import com.kakawanyifan.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
@Slf4j
public class MessageServiceImpl implements MessageService {

// 消息队列
List<String> messageList = new ArrayList<>();

@Override
public void sendMessage(String id) {
log.info("待发送短信的订单,已经加入到消息队列。id:" + id);
messageList.add(id);
}

@Override
public String doMessage() {
String id = messageList.remove(0);
log.info("已完成短信发送。id:" + id);
return id;
}
}

表现层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.kakawanyifan.controller;

import com.kakawanyifan.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("message")
public class MessageController {

@Autowired
MessageService messageService;

@GetMapping
public String doMessage(){
String id = messageService.doMessage();
return id;
}

}

解释说明:短信处理表现层接口暂且开发出一个处理消息的入口,但是此业务是对应业务层中设计的模拟接口,实际业务不需要设计此接口。

模拟调用

订单:curl --location --request POST 'http://localhost:8080/orders/2'
短信:curl --location --request GET 'http://localhost:8080/message'

整合ActiveMQ

ActiveMQ是MQ产品中的元老级产品,早期标准MQ产品之一,在AMQP协议没有出现之前,占据了消息中间件市场的绝大部分份额,后期因为AMQP系列产品的出现,迅速走弱,目前仅在一些线上运行的产品中出现,新产品开发较少采用。

安装

步骤:

  1. 通过官网下载acticemq的安装包
    官网:https://activemq.apache.org/
  2. 解压
    1
    tar -zxvf apache-activemq-5.16.5-bin.tar.gz
  3. 启动
    在解压后的bin目录下找到activemq,启动命令:
    1
    ./activemq start
    另,停止命令:./activemq stop

启动之后,我们可以访问其管理页面,地址是http://10.211.55.14:8161/admin/,用户名和密码都是admin

问题处理

如果启动后没有任何报错,如下:

1
2
3
4
INFO: Loading '/usr/local/activemq/apache-activemq-5.17.3//bin/env'
INFO: Using java '/usr/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/usr/local/activemq/apache-activemq-5.17.3//data/activemq.pid' (pid '14377')

而且在data/activemq.log没有任何内容。
可能是因为JDK版本不对。
通过官网的下载页面,可以看到要求的JDK版本。

JDK版本

如果ActiveMQ正常启动,防火墙端口也开放了,但是浏览器发现无法访问其管理页面。

解决方法:
修改conf目录的jetty.xml
找到bean id="jettyPort"配置下的name="host"的参数配置,将参数值由127.0.0.1改为0.0.0.0,然后重启ActiveMQ。

整合

导入SpringBoot整合ActiveMQ的Starter:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

配置ActiveMQ的服务器地址:

1
2
3
spring:
activemq:
broker-url: tcp://10.211.55.14:61616

使用JmsMessagingTemplate操作ActiveMQ:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.kakawanyifan.service.impl;

import com.kakawanyifan.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MessageServiceActivemqImpl implements MessageService {

@Autowired
JmsMessagingTemplate jmsMessagingTemplate;

@Override
public void sendMessage(String id) {
log.info("待发送短信的订单已纳入处理队列,id : " + id);
jmsMessagingTemplate.convertAndSend("order.queue.id",id);
}

@Override
public String doMessage() {
String id = jmsMessagingTemplate.receiveAndConvert("order.queue.id",String.class);
log.info("已完成短信发送业务,id : " + id);
return id;
}
}

解释说明:

  • 发送消息需要先将消息的类型转换成字符串,然后再发送,所以是convertAndSend。需要定义消息发送的位置和具体的消息内容,此处使用ID作为消息内容。
  • 接收消息需要先将消息接收到,然后再转换成指定的数据类型,所以是receiveAndConvert。接收消息除了提供读取的位置,还要给出转换后的数据的具体类型。

使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.kakawanyifan.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageListener {
@JmsListener(destination = "order.queue.id")
public void receive(String id){
log.info("已完成短信发送业务,id : " + id);
}
}

解释说明:使用注解@JmsListener定义当前方法监听ActiveMQ中指定名称的消息队列。

如果当前消息队列处理完还需要继续向下传递当前消息到另一个队列中使用注解@SendTo即可,这样即可构造连续执行的顺序消息队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.kakawanyifan.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageListener {
@JmsListener(destination = "order.queue.id")
@SendTo("order.next.id")
public String receive(String id){
log.info("已完成短信发送业务,id : " + id);
return "next : " + id;
}
}

切换消息模型由点对点模型到发布订阅模型,修改jms配置pub-sub-domain即可。

1
2
3
4
5
spring:
activemq:
broker-url: tcp://10.211.55.14:61616
jms:
pub-sub-domain: true

解释说明:pub-sub-domain默认值为false,即点对点模型,修改为true后就是发布订阅模型。

整合RocketMQ

安装

官网:https://rocketmq.apache.org

下载RocketMQ的二进制包。

RocketMQ的二进制包

之后解压即可,解压后的目录如下:

1
2
3
4
5
6
7
drwxr-xr-x. 2 root root  4096 Jun 21  2022 benchmark
drwxr-xr-x. 3 root root 4096 Jan 29 14:47 bin
drwxr-xr-x. 6 root root 4096 Jun 20 2022 conf
drwxr-xr-x. 2 root root 4096 Jun 21 2022 lib
-rw-r--r--. 1 root root 17327 Jun 20 2022 LICENSE
-rw-r--r--. 1 root root 1338 Jun 20 2022 NOTICE
-rw-r--r--. 1 root root 11219 Jun 20 2022 README.md

启动

工作模式

工作模式

在RocketMQ中,有命名服务器(NameServer)和业务服务器(broker)的概念。
生产者与消费者并不是直接与业务服务器(broker)联系,而是通过命名服务器(NameServer)进行通信。
业务服务器(broker)启动后会通知命名服务器(NameServer),自己已经上线,这样命名服务器(NameServer)中就保存有所有的业务服务器(broker)信息。当生产者与消费者需要连接业务服务器(broker)时,通过命名服务器(NameServer)找到对应的处理业务的业务服务器(broker)。命名服务器(NameServer)在整套结构中起到一个信息中心的作用。

  • 注意:是通过命名服务器找到业务服务器,之后还是和业务服务器进行通信。

启动服务器

基于我们上文的讨论,我们要先启动命名服务器(NameServer),然后才能启动业务服务器(broker)。

启动命名服务器(NameServer):

1
nohup sh mqnamesrv &

启动之后,在用户的home目录,会生成一个logs/rocketmqlogs目录,相关的日志位于该目录下,日志名是namesrv.log
默认对外服务端口9876。

启动业务服务器(broker):

1
nohup sh mqbroker -n localhost:9876 &

测试服务器启动状态

测试生产者,示例代码:

1
2
export NAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
运行结果:
1
2
3
4
5
6

【部分运行结果略】

SendResult [sendStatus=SEND_OK, msgId=FDB22C26F4E40000021C42FFFE9DD35842E3266474C293AFBB5003E7, offsetMsgId=0AD3370E00002A9F00000000000695CC, messageQueue=MessageQueue [topic=TopicTest, brokerName=centos-linux.shared, queueId=1], queueOffset=499]

【部分运行结果略】

测试消费者,示例代码:

1
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
运行结果:
1
2
3
4
5
6

【部分运行结果略】

ConsumeMessageThread_please_rename_unique_group_name_4_16 Receive New Messages: [MessageExt [brokerName=centos-linux.shared, queueId=0, storeSize=216, queueOffset=347, sysFlag=0, bornTimestamp=1674980166678, bornHost=/10.211.55.14:53548, storeTimestamp=1674980166679, storeHost=/10.211.55.14:10911, msgId=0AD3370E00002A9F00000000000493F4, commitLogOffset=300020, bodyCRC=1380090473, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1674980218912, UNIQ_KEY=FDB22C26F4E40000021C42FFFE9DD35842E3266474C293AFB8160186, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51, 57, 48], transactionId='null'}]]

【部分运行结果略】

关闭服务器

关闭业务服务器:

1
sh mqshutdown broker

关闭名称服务器

1
sh mqshutdown namesrv

问题处理

按照本文的启动方式,启动期间的部分日志会打印在bin目录的nohup.out文件中。

如果在启动名称服务器期间,有报错如下:

1
java.net.BindException: Address already in use

是因为端口被占用了,默认端口是9876。

如果在启动名称服务器期间,有报错如下:

1
2
3
4
5
6
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/rocketmq/rocketmq-all-4.9.4-bin-release/bin/hs_err_pid3714.log

是因为内存不够,修改bin目录下的runserver.sh
搜索关键词JAVA_OPT,可以找到设置JAVA_OPT的代码,在结尾添加如下的一行:

1
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m"

如果在启动业务服务器期间报内存不够,修改bin目录下的runserver.sh
在设置JAVA_OPT的代码的结尾,添加如下的一行:

1
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

整合

导入SpringBoot整合RocketMQ的Starter,该Starter不由SpringBoot维护

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>

配置RocketMQ

1
2
3
4
rocketmq:
name-server: 10.211.55.14:9876
producer:
group: group_rocketmq

解释说明:

  • name-server:名称服务器地址。
  • producer.group:生产者所属组。

使用RocketMQTemplate操作RocketMQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.kakawanyifan.service.impl;

import com.kakawanyifan.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MessageServiceRocketmqImpl implements MessageService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Override
public void sendMessage(String id) {
log.info("待发送短信的订单已纳入处理队列(rocketmq),id:"+id);
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功");
}
@Override
public void onException(Throwable e) {
log.info("消息发送失败");
}
};
// 使用asyncSend方法发送异步消息。
rocketMQTemplate.asyncSend("order_id",id,callback);
}

@Override
public String doMessage() {
return null;
}
}
  • asyncSend,发送异步消息。

使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.kakawanyifan.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RocketMQMessageListener(topic = "order_id",consumerGroup = "group_rocketmq")
public class MessageListener implements RocketMQListener<String> {
@Override
public void onMessage(String id) {
log.info("已完成短信发送业务(rocketmq),id:"+id);
}
}

解释说明:

  • RocketMQ的监听器必须按照标准格式开发,实现RocketMQListener接口,泛型为消息类型。
  • 使用注解@RocketMQMessageListener定义当前类监听RabbitMQ中指定组、指定名称的消息队列。

在发送消息的时候,可能会失败,查看异常的话,内容可能如下:

1
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to 10.211.55.14:10911 failed

原因是当生产者与消费者需要连接业务服务器(broker)时,通过命名服务器(NameServer)找到对应的处理业务的业务服务器(broker)。,即是通过命名服务器找到业务服务器,之后还是和业务服务器进行通信。

整合Kafka

安装

Kafka的功能相当于RocketMQ中的业务服务器,Kafka运行还需要一个类似于命名服务器的服务。在Kafka安装目录中自带一个类似于命名服务器的工具,Zookeeper。

本文是以单节点为例的,在《分布式事件流平台Kafka:1.基础》有以集群为例,讨论其安装方法。
关于Zookeeper,可以参考我们在《20.Dubbo和Zookeeper》的讨论。

官网:https://kafka.apache.org

下载Kafka的二进制包。

Kafka的二进制包

之后解压即可,解压后的目录如下:

1
2
3
4
5
6
7
drwxr-xr-x. 3 root root  4096 Dec 22 05:21 bin
drwxr-xr-x. 3 root root 4096 Dec 22 05:21 config
drwxr-xr-x. 2 root root 4096 Jan 29 17:31 libs
-rw-r--r--. 1 root root 14844 Dec 22 05:14 LICENSE
drwxr-xr-x. 2 root root 4096 Dec 22 05:21 licenses
-rw-r--r--. 1 root root 28184 Dec 22 05:14 NOTICE
drwxr-xr-x. 2 root root 4096 Dec 22 05:21 site-docs

配置文件server.properties

bin/config目录下,我们会找到配置文件server.properties

其中部分关键内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092

注意:

  • listeners,需要改为listeners=PLAINTEXT://0.0.0.0:9092
    listeners,监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的Kafka服务。
  • advertised.listeners,需要改为advertised.listeners=PLAINTEXT://【服务器IP】:9092
    advertise的含义表示宣称的、公布的,就是组监听器是Broker用于对外发布的。

启动

1
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
1
./kafka-server-start.sh -daemon ../config/server.properties

解释说明:-daemon,表示在后台运行。

使用

创建topic:
示例代码:

1
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1
运行结果:
1
Created topic topic1.

查看Kafka中topic的情况:
示例代码:

1
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
运行结果:
1
topic1

查看某一个topic的详细信息:

示例代码:

1
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic1
运行结果:
1
2
Topic: topic1   TopicId: XjiNjy6pShG7TZFSWSHi_Q PartitionCount: 1       ReplicationFactor: 1    Configs: 
Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

生产消息:

1
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1

此时,会进入生产消息的编辑模式,回车键发送消息。

生产消息

消费消息:

1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --from-beginning

删除topic:

1
./bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic topic1

此时我们再查看topic1,示例代码:

1
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic1
运行结果:
1
2
3
4
5
6
7
Error while executing topic command : Topic 'topic1' does not exist as expected
[2023-01-29 20:20:26,692] ERROR java.lang.IllegalArgumentException: Topic 'topic1' does not exist as expected
at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:401)
at kafka.admin.TopicCommand$TopicService.describeTopic(TopicCommand.scala:313)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:61)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)

有些资料说,再次查看的话,并不会被直接删除,而是有"marked for deletion,只是将topic1标记了删除",在这里没有复现。

关闭服务:

关闭Kafka:

1
./bin/kafka-server-stop.sh

关闭Zookeeper:

1
./bin/zookeeper-server-stop.sh

整合

导入SpringBoot整合Kafka的Starter,此坐标由SpringBoot维护版本。

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

配置Kafka:

1
2
3
4
5
spring:
kafka:
bootstrap-servers: 10.211.55.14:9092
consumer:
group-id: order

解释说明:

  • bootstrap-servers:Kafka的服务地址。
  • consumer.group-id:设置消费者所属的组ID。

使用KafkaTemplate操作Kafka:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.kakawanyifan.service.impl;

import com.kakawanyifan.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MessageServiceKafkaImpl implements MessageService {

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

@Override
public void sendMessage(String id) {
kafkaTemplate.send("order_id",id);
log.info("待发送短信的订单已纳入处理队列(kafka),id:" + id);
}

@Override
public String doMessage() {
return null;
}
}

解释说明:使用send方法,第一个参数是topic的名称。

使用消息监听器,指定监听的topics:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.kakawanyifan.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageListener {
@KafkaListener(topics = "order_id")
public void onMessage(ConsumerRecord<String,String> record){
log.info("已完成短信发送业务(kafka),id:" + record.value());
}
}

更详细的配置,可以参考官方文档。
地址:https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#application-properties.integration.spring.kafka.consumer.auto-commit-interval

文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/10822
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

评论区