SpringCloud 学习笔记

Gateway

首先创建一个模块gateway-service,然后引入相关的 pom 依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
在模块下的resources文件夹下创建application.yaml文件,并添加配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
server:
port: 8112
spring:
application:
name: gateway
cloud:
nacos:
server-addr: 127.0.0.1:8848
username: nacos
password: nacos
gateway:
routes:
- id: system-service
uri: lb://system-service
predicates:
- Path=/dev-api/system/**
filters:
- StripPrefix=1
- id: infra-service
uri: lb://infra-service
predicates:
- Path=/dev-api/infra/**,/dev-api/files/**
filters:
- StripPrefix=1

Tips

在使用 Gateway 的时候出现跨域问提,需要修改gateway-service模块下resources文件夹下的application.yaml的文件,添加跨域配置,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
server:
port: 8888
spring:
application:
name: gateway
cloud:
nacos:
server-addr: 127.0.0.1:8848
username: nacos
password: nacos
gateway:
routes:
- id: system-service
uri: lb://system-service
predicates:
- Path=/system/**
- id: infra-service
uri: lb://infra-service
predicates:
- Path=/infra/**
globalcors: #跨域配置
cors-configurations:
"[/**]":
allowedOriginPatterns: "*" #允许所有ip跨域访问
allowedMethods: "*" #允许所有请求方式
allowedHeaders: "*" #允许任何头进行跨域
allowCredentials: true #允许携带cookie
# 以上配完成,简单跨域复杂跨域都允许。

由于之前是只使用 springboot,已经添加过跨域配置,需要将之前的跨域配置删除文件分别为CorsConfigSecurityConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class CorsConfig implements WebMvcConfigurer {
// @Override
// public void addCorsMappings(CorsRegistry registry) {
// //设置允许跨域的路径
// registry.addMapping("/**")
// //设置允许跨域请求的域名
// .allowedOriginPatterns("*")
// //是否允许cookie
// .allowCredentials(true)
// //设置允许的请求方式
// .allowedMethods("GET","POST","DELETE","PUT")
// //设置允许的header属性
// .allowedHeaders("*")
// //跨域允许时间
// .maxAge(3600);
// }

@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/files/**").addResourceLocations("file:/Users/wangchao/Downloads/files/");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Configuration
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class SecurityConfig extends WebSecurityConfigurerAdapter {

@Bean
public PasswordEncoder passwordEncoder(){
return new BCryptPasswordEncoder();
}

@Bean
@Override
public AuthenticationManager authenticationManagerBean() throws Exception {
return super.authenticationManagerBean();
}

@Autowired
private JwtAuthenticationTokenFilter tokenFilter;

@Autowired
private AuthenticationEntryPointImpl authenticationEntryPoint;

@Autowired
private AccessDeniedHandlerImpl accessDeniedHandler;

@Override
protected void configure(HttpSecurity http) throws Exception {
http
//关闭csrf
.csrf().disable()
//不通过Sesion获取SecurityContext
.sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS)
.and()
.authorizeRequests()
//对于登录接口 允许匿名访问
.antMatchers("/system/login","/system/auth/send-sms-code","/system/auth/sms-login","/system/captcha/check","/system/auth/social-login").anonymous()
//允许刷新令牌的接口和登出接口可以认证或未认证都可以访问
.antMatchers("/system/auth/refresh-token","/system/logout","/system/user/imgProxy/*","/files/**","/system/social-user/oauthGoogle").permitAll()
//除上面外的所有请求全部需要鉴权认证
.anyRequest().authenticated();
//添加过滤器
http.addFilterBefore(tokenFilter, UsernamePasswordAuthenticationFilter.class);

//添加异常处理器
http.exceptionHandling()
//认证失败处理类
.authenticationEntryPoint(authenticationEntryPoint)
//用户权限失败处理类
.accessDeniedHandler(accessDeniedHandler);

//允许跨域
// http.cors();
}
}

各个服务之间的调用使用FeignClient,我们可以单独创建一个单独的模块api-service,提供不同服务之前的接口访问。

一个模块使用另一个模块的接口在 springboot 中是可以添加 pom 依赖来使用,但是当我们在各个服务之前的调用接口服务的时候就不用使用 pom 依赖,而是使用 FeignClient 这项技术。我们需要创建各个服务的通用接口 client,然后在每个接口上添加@FeignClient注解,接口中可以编写需要使用的方法。并在需要使用这些接口服务的启动服类上添加@EnableFeignClients(basePackages = "com.wangchao.client")注解,()中是 client 所在的包,例如:

1
2
3
4
5
6
7
8
@FeignClient(name = "infra-service", configuration = FeignSupportFormConfig.class)
public interface InfraClient {

@PostMapping(value = "/infra/file/uploadFileByClient",consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
Map<String,String> uploadFileByClient(
@RequestPart("file") MultipartFile file,
@RequestParam("folder") String folder);
}
1
2
3
4
5
@FeignClient(name = "system-service",configuration = FeignSupportFormConfig.class)
public interface SystemClient {
@PostMapping(value = "/system/notify-template/sendNotifyClient",consumes = "application/json")
void sendNotifyClient(@RequestBody SendNotifyDto sendNotifyDto);
}
1
2
3
4
5
6
7
8
9
10
@EnableFeignClients(basePackages = "com.wangchao.client"
,defaultConfiguration = FeignSupportFormConfig.class
)
@EnableDiscoveryClient
@SpringBootApplication
public class InfraApplication {
public static void main(String[] args) {
SpringApplication.run(InfraApplication.class,args);
}
}
1
2
3
4
5
6
7
8
9
10
@EnableFeignClients(basePackages = "com.wangchao.client"
,defaultConfiguration = FeignSupportFormConfig.class
)
@SpringBootApplication
@EnableDiscoveryClient
public class SystemApplication {
public static void main(String[] args) {
SpringApplication.run(SystemApplication.class,args);
}
}

因为我的项目中使用来 springsecurty,会对访问的请求进行拦截查看 Token 是否有效,在使用的过程我发现,infra-service 服务通过 SystemClient 调用 system-service 服务的请求时被拒绝,查看发现是因为 infra-service 通过 SystemClient 发送请求来拿取 system-service 的时候请求中没有携带 Token,这时我们就需要创建一个拦截器来拦截;以及当我们服务之前通过 client 访问的请求携带的是文件或者对象这样的数据的时候也会报错,这是因为访问的时候 FeignClient 没有对这些资源进行编码或者序列化处理,这些我们都需要配置进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Configuration
public class FeignSupportFormConfig {
private final ObjectFactory<HttpMessageConverters> messageConverters;

// 使用构造函数注入 HttpMessageConverters
public FeignSupportFormConfig(ObjectFactory<HttpMessageConverters> messageConverters) {
this.messageConverters = messageConverters;
}

@Bean
public Encoder feignEncoder() {
return new SpringFormEncoder(new SpringEncoder(messageConverters));
}

@Bean
public RequestInterceptor userInfoRequestInterceptor(){
return new RequestInterceptor() {
@Override
public void apply(RequestTemplate requestTemplate) {
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
String token = (String) authentication.getCredentials();
requestTemplate.header("Authorization",token);
}
};
}
}

配置创建完成后我们需要给 FeignClient 以及启动类添加这个配置

@FeignClient(name = “infra-service”, configuration = FeignSupportFormConfig.class) > @EnableFeignClients(basePackages = “com.wangchao.client”,defaultConfiguration = FeignSupportFormConfig.class)


配置管理

共享配置

  • 添加依赖
1
2
3
4
5
6
7
8
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
  • 新建 bootstrap.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
application:
name: system-service
cloud:
nacos:
server-addr: 20.39.195.21:8848
username: nacos
password: nacos
config:
file-extension: yaml
shared-configs:
- data-id: shared-jdbc.yaml
- data-id: shared-log.yaml
  • 在 nacos 服务中创建配置
    nacos的管理配置图片
  • 最后将 application.yaml 中重复的配置删除

动态路由

  • 在网关模块下添加依赖
1
2
3
4
5
6
7
8
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
  • 创建 bootstrap.yaml 文件,并将原 application.yaml 中重复的删除
1
2
3
4
5
6
7
8
9
10
spring:
application:
name: gateway
cloud:
nacos:
server-addr: xx.xx.xx.xx:8848
config:
file-extension: yaml
shared-configs:
- data-id: shared-log.yaml
  • 在 routers 包下创建DynamicRouteLoader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Slf4j
@Component
//@RequiredArgsConstructor
public class DynamicRouteLoader {
@Autowired
private NacosConfigManager nacosConfigManager;

@Autowired
private RouteDefinitionWriter routeDefinitionWriter;

private final Set<String> routeIds = new HashSet<>();

private final String dataId = "gateway-routes.json";
private final String group = "DEFAULT_GROUP";

@PostConstruct
public void intiRouteConfigListener() throws NacosException {
//1、项目启动,先拉取一次配置,并添加配置监听器
String configInfo = nacosConfigManager.getConfigService()
.getConfigAndSignListener(dataId, group, 5000, new Listener() {
@Override
public Executor getExecutor() {
return null;
}

@Override
public void receiveConfigInfo(String configInfo) {
//2、监听到配置变更,需要去更新路由表
updateConfigInfo(configInfo);
}
});
//3、第一次读取到配置,也需要更新呢到路由表
updateConfigInfo(configInfo);
}

public void updateConfigInfo(String configInfo){
log.info("监听到路由配置信息:{}",configInfo);
//1、解析配置信息,转为RouteDefinition
List<RouteDefinition> routeDefinitions = JSONUtil.toList(configInfo, RouteDefinition.class);
//2、删除旧的路由表
for (String routeId : routeIds) {
routeDefinitionWriter.delete(Mono.just(routeId)).subscribe();
}
routeIds.clear();
//3、更新路由表
for(RouteDefinition routeDefinition: routeDefinitions){
//3.1 更新路由表
routeDefinitionWriter.save(Mono.just(routeDefinition)).subscribe();
//3.2 记录路由id,便于下一次更新时删除
routeIds.add(routeDefinition.getId());
}
}
}
  • 最后在 nacos 的配置列表中创建gateway-routes.json配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
[
{
"id": "system-service",
"uri": "lb://system-service",
"predicates": [
{
"name": "Path",
"args": {
"_genkey_0": "/dev-api/system/**"
}
}
],
"filters": [
{
"name": "StripPrefix",
"args": {
"_genkey_0": "1"
}
}
]
},
{
"id": "infra-service",
"uri": "lb://infra-service",
"predicates": [
{
"name": "Path",
"args": {
"_genkey_0": "/dev-api/infra/**",
"_genkey_1": "/dev-api/files/**"
}
}
],
"filters": [
{
"name": "StripPrefix",
"args": {
"_genkey_0": "1"
}
}
]
}
]
  • 之后再在gateway-routes.json文件中添加配置后,已经运行的网关服务不需要再重启便可动态的添加上去
    已经运行的网关服务在nacos中添加配置后会自动更新路由

微服务保护和分布式事务

sentinel

  • 引入 sentinel 依赖
1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
  • 添加 application.yml 配置
1
2
3
4
5
6
7
8
9
10
11
spring:
application:
name: system-service
cloud:
sentinel:
transport:
dashboard: ip:port #sentinel控制台地址
http-method-specify: true #是否设置请求方式作为资源名称
feign:
sentinel:
enabled: true
  • sentinel 控制台

RabbitMQ

快速入门

  • 引入 pom 依赖以及添加配置
1
2
3
4
5
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
5
6
7
spring:
rabbitmq:
host: your host
port: 5672
virtual-host: /sell
username: sell
password: 123321
  • 发送消息
    使用类RabbitTemplate来进行发送消息
1
2
3
4
5
6
7
8
9
10
11
12
@SpringBootTest
class SpringAmqpTest {

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
String queueName = "simple.queue";
String message = "Hello, spring amqp";
rabbitTemplate.convertAndSend(queueName,message);
}
}
  • 接收消息
    通过@RabbitListener注解来监听
1
2
3
4
5
6
7
8
9
@Slf4j
@Component
public class SpringRabbitListener {

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message){
log.info("监听到simple.queue的消息:【{}】",message);
}
}

基于注解声明队列交换机

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1",durable = "true"),
exchange = @Exchange(name = "sell.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue1(String message) throws InterruptedException {
log.info("消费者1监听到direct.queue1的消息:【{}】",message);
}

消息转换器

可以传送对象数据

1
2
3
4
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
1
2
3
4
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

发送者可靠性

  • 发送者重连
    添加配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
rabbitmq:
host: ip
port: port
virtual-host: /sell
username: sell
password: 123321
# 发送者可靠性-发送者重连
connection-timeout: 1s # 设置MQ的链接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待市场倍数,下次等待时长= initial-interval * multiplier
max-attempts: 3 # 最大重试次数
  • 发送者确认
    添加配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
spring:
rabbitmq:
host: ip
port: port
virtual-host: /sell
username: sell
password: 123321
# 发送者可靠性-发送者重连
connection-timeout: 1s # 设置MQ的链接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待市场倍数,下次等待时长= initial-interval * multiplier
max-attempts: 3 # 最大重试次数
# 发送者可靠性-发送者确认
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型 none: 关闭confirm机制 simple: 同步阻塞等待MQ的回执消息 correlated: MQ异步回调方式返回回执消息
publisher-returns: true # 开启publisher return机制

添加 ReturnCallback,只需一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
@Configuration
public class MqConfig {
@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("监听到了消息return callback");
log.debug("exchange:{}",returnedMessage.getExchange());
log.debug("routingKey:{}",returnedMessage.getRoutingKey());
log.debug("message:{}",returnedMessage.getMessage());
log.debug("replyCode:{}",returnedMessage.getReplyCode());
log.debug("replyText:{}",returnedMessage.getReplyText());
}
});
}
}

添加 ConfirmCallback,每个发送消息都需要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void testConfirmCallBack() throws InterruptedException {
//创建correlationData
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.error("spring amqp 处理确认结果异常",ex);
}

@Override
public void onSuccess(CorrelationData.Confirm result) {
//判断是否成功
if (result.isAck()){
log.debug("收到confirmCallback ack 消息发送成功!");
}else {
log.error("收到confirmCallback nack 消息发送失败!reason:{}",result.getReason());
}
}
});

String exchangeName = "callback.direct";
String message = "Hello, ConfirmCallBack";
rabbitTemplate.convertAndSend(exchangeName,"blue",message,cd);

Thread.sleep(2000);
}