前言

在微服务架构中,网关作为流量的门户,其稳定性和性能至关重要。在我的开源项目 ZAutumn-gateway 的开发过程中,为了实现动态路由与限流规则的热更新,我们将配置存入了Redis。但随之而来的是这样一个问题——如果采用传统的同步阻塞方式监听配置变更,那么在主流程(路由转发、熔断限流)中,关键的I/O线程很可能因等待配置更新而被阻塞,这将损害网关处理高并发请求的核心能力。

为了解决这一问题,我引入了响应式编程范式,并选用 Project Reactor 作为核心工具库,将配置更新的相关逻辑全部改造为异步非阻塞处理。这样一来,配置监听器在等待Redis消息时不会阻塞任何线程,当消息到达时才会异步地触发更新操作,从而确保了网关数据平面的高性能和响应性。

阅读本文前,您需要掌握:

  1. 微服务架构基础:微服务架构的基本思想、API 网关的角色(统一入口、路由、过滤等)
  2. 响应式编程范式:核心思想(异步非阻塞、数据流、变化传播)、背压(Backpressure)概念
  3. Spring 技术栈:Spring Boot 基础(如注解配置、依赖注入)、Spring Cloud 生态的初步了解(如服务发现、配置中心)

通过本文,您将获得以下收获:

  1. 掌握Reactor的核心操作符:学会使用Mono和Flux这两个核心类来构建异步数据流。
  2. 了解响应式编程的实战意义:ZAutumn-gateway中如何利用响应式编程优雅地解决动态配置更新难题。

阅读建议:

本文建议搭配 ZAutumn网关服务端项目进行食用。

代码仓库地址: lcdzzz/ZAutumn-gateway: ZAutumn网关服务端 ,查看本文的完整实现源码,欢迎 Star 和贡献你的想法!

响应式编程基础

核心概念

响应式编程是一种面向数据流和变化传播的编程范式。在 Spring 生态中,主要通过 Project Reactor 实现:

  • Mono:表示 0 或 1 个元素的异步序列
  • Flux:表示 0 到 N 个元素的异步序列
  • 背压机制:消费者可以控制生产者的生产速度

关键操作符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// then() - 在上游完成后再执行后续操作
repository.refreshRoutes().then(Mono.fromRunnable(() ->
eventPublisher.publishEvent(new RefreshRoutesEvent(this))));

// flatMap() - 将每个元素映射为新的 Publisher 并合并
redisTemplate.opsForHash().entries(ZAutumnGatewayUtils.getRouteKey())
.flatMap(entry -> {
// 处理每个哈希条目
});

// doOnNext() - 在每个元素发射后执行副作用
Flux.fromIterable(cachedRoutes).map(Pair::getValue)
.doOnNext(route -> {
// 处理每个路由定义
});

网关核心组件分析

动态路由定义定位器

DynamicRouteDefinitionLocator是路由系统的入口点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component("dynamicRouteDefinitionLocator")
public class DynamicRouteDefinitionLocator implements RouteDefinitionLocator {
private final RedisRouteDefinitionRepository repository;
private final ApplicationEventPublisher eventPublisher;

public DynamicRouteDefinitionLocator(RedisRouteDefinitionRepository repository,ApplicationEventPublisher eventPublisher) {
this.repository = repository;
this.eventPublisher = eventPublisher;
}

@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return repository.getRouteDefinitions();
}

public Mono<Void> refreshCache() {
// then() - 在上游完成后再执行后续操作
return repository.refreshRoutes().then(Mono.fromRunnable(() ->
eventPublisher.publishEvent(new RefreshRoutesEvent(this))));
}
}

解析

  1. 实现了RouteDefinitionLocator接口,向Spring Cloud Gateway提供路由定义
  2. 使用响应式流Flux<RouteDefinition>返回路由信息
  3. 提供刷新缓存的方法refreshCache,支持动态更新

Redis路由存储

RedisRouteDefinitionRepository 负责路由配置的存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class RedisRouteDefinitionRepository implements RouteDefinitionRepository {
// ...省略其他属性
private final ReactiveStringRedisTemplate redisTemplate;
public RedisRouteDefinitionRepository(ReactiveStringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}

public Mono<Void> refreshRoutes() {
// 这里通过redisTemplate获取Redis中的路由配置数据,返回一个包含所有路由条目的响应式流 Flux<Map.Entry<HK, HV>>
return redisTemplate.opsForHash().entries(ZAutumnGatewayUtils.getRouteKey())
// 开启扁平映射:将每个Redis条目转换为一个由多个Pair组成的Flux流
.flatMap(entry -> {
String routeId = (String) entry.getKey();
List<RouteDefinition> jsonArray = JSONArray.parseArray(
(String) entry.getValue(), RouteDefinition.class);
/**
public class Pair<K, V> {
public final K k;
public final V v;

public Pair(K k, V v) {
this.k = k;
this.v = v;
}
}
*/
List<Pair<String, RouteDefinition>> values = new ArrayList<>();
for (RouteDefinition route : jsonArray) {
values.add(Pair.of(routeId, route));
}
// 用于将一个 Iterable 集合(如 List、Set)转换为一个 Flux 流
return Flux.fromIterable(values);
})
// 过滤掉没有通过validateRouteDefinition检查的路由定义。通过检查的才会继续留在流中。
.filter(pair -> validateRouteDefinition(pair))
// collectList()将前面过滤后的多个单独路由对象收集到一个List集合中,将Flux<Pair<String, RouteDefinition>>转换为Mono<List<Pair<String, RouteDefinition>>>。
.collectList()
// doOnNext是一个副作用操作符,它不会改变数据流的内容,而是当数据流发射元素时执行特定操作(此处是更新缓存)。这一步确保了验证通过的路由能够及时更新到系统缓存中
.doOnNext(routes -> updateCache(routes))
.then();
}
}

解析

  1. 数据存储设计:使用Redis Hash存储路由配置,支持高效的路由查询和批量更新操作
  2. 响应式处理链
    • flatMap操作符实现异步扁平映射,将每个Redis条目转换为包含多个路由定义的Flux流
    • filter操作符进行路由验证,确保只有有效路由进入缓存
    • collectList将分散的路由定义聚合为列表,便于批量处理
  3. 缓存更新策略doOnNext作为副作用操作符,在数据流经过时更新缓存,而不改变流本身内容
  4. 完成信号处理then()操作符忽略具体数据,只关注操作完成信号,实现非阻塞等待机制

扁平化:

简单来说就是将List<String,RouteDefinition> 变成List<RouteDefinition>

限流配置Sentinel集成

RedisSentinelRouteDataSource实现限流规则的动态加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class RedisSentinelRouteDataSource extends
AbstractDataSource<Flux<GatewayFlowRule>, Set<GatewayFlowRule>> {

@Override
// Flux<GatewayFlowRule>表示一个包含0到N个GatewayFlowRule元素的异步序列
public Flux<GatewayFlowRule> readSource() {
return redisTemplate.opsForHash().values(ZAutumnGatewayUtils.getLimitRouteKey())
.flatMapIterable(json -> JSONArray.parseArray((String) json, GatewayFlowRule.class));
}

public Mono<Void> update() {
return readSource()
// collectList():将 Flux<GatewayFlowRule>中的所有元素收集到一个 Mono<List<GatewayFlowRule>>中
.collectList()
.doOnNext(newValue -> {
if (newValue != null && !newValue.isEmpty()) {
getProperty().updateValue(new HashSet<>(newValue));
}
})
.then();
}
}

解析

  1. 数据源设计:继承Sentinel的AbstractDataSource,提供限流规则的数据源支持
  2. 流式数据读取flatMapIterable将JSON字符串数组展开为连续的限流规则流,支持大规模规则集的流式处理
  3. 批量更新优化collectList将流式数据收集为列表,减少频繁更新带来的性能开销
  4. 响应式集成:通过Mono返回更新结果,与Sentinel框架无缝集成,保持非阻塞特性

动态更新保障机制

ZAutumnGatewayChangeGuarantor

提供双重监控保障:

  • 定时轮询:每10秒检查版本变化,确保配置同步
  • 实时通知:通过 Redis Pub/Sub 接收即时变更通知
  • 版本控制:使用时间戳防止重复更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Component
public class ZAutumnGatewayChangeGuarantor implements
ApplicationListener<ApplicationStartedEvent>, DisposableBean {

// 1. 定时轮询任务
private void startPollingTask() {
scheduler = Executors.newSingleThreadScheduledExecutor();// 创建单线程的定时任务调度器
scheduler.scheduleAtFixedRate(() -> {
try {
// block()确保版本检查操作在本次定时任务周期内完成,避免多个检查任务并发执行
checkVersionChange().block(Duration.ofSeconds(5));
} catch (Exception e) {
logger.error("轮询检查异常", e);
}
}, 10, 10, TimeUnit.SECONDS); // 10秒一次
// 第一个10是首次延迟10秒后开始执行,第二个10是执行间隔时间
}

// 2. Pub/Sub监听器
private void startPubSubListener() {
Disposable subscribe = redisTemplate.listenToChannel(ZAutumnGatewayUtils.getChannelKey())
.doOnNext(message -> {
logger.info("收到实时变更通知: {}", message);
getCurrentVersion().subscribe(version -> {
// 更新版本号并刷新配置
lastCheckVersion = version;
lastChangeTimestamp = System.currentTimeMillis();
updateAll().subscribe();
});
})
.subscribe();
}
}

解析

  1. updateAll()返回的Mono<Void>更像是一个”任务订单”,描述了要做什么但还没开始做;而.subscribe()就是下达”开始执行”的指令。
  2. 以上代码所有操作基于响应式编程,避免阻塞监听线程。
  3. 通过版本号机制保证配置变更的一致性。

总结

响应式编程绝非仅是API调用方式的改变,而是一种以数据流变化传播为核心的编程范式革新。

在阅读响应式编程代码时,开发者的注意力应该从“如何一步步执行”转移到“数据如何流动和转换上

回顾

回顾这次实战,响应式编程的又是在微服务网关这个场景中的优势主要体现在如下:

  1. 资源效率最大化:通过非阻塞异步处理,网关在监听Redis配置变更时,不会占用宝贵的I/O线程。这使得系统在等待外部事件(如数据库查询、消息到达)时,能够释放线程资源去处理其他请求,显著提升了系统的并发处理能力资源利用率。这正是响应式系统所追求的弹性(Elastic)特性——在有限资源下承载更大负载
  2. 清晰的流式逻辑:利用 Reactor 提供的丰富操作符(如 flatMap, filter, then),我们能够像组装流水线一样,以流畅的链式调用表达复杂的异步业务逻辑(如“获取配置→验证→转换→更新缓存→发布事件”)。这不仅使代码更简洁、更易维护,也大大降低了传统回调方式可能产生的“回调地狱”风险。

参考文章:

响应式编程基石 Project Reactor源码解读-CSDN博客

微服务网关中的 WebFlux:为什么 Gateway 要用响应式编程?

SpringCloud之Gateway_spring cloud gateway 是基于 spring webflux 响应 式框架构建的高-CSDN博客