灰度发布

什么是灰度发布

介绍灰度发布流程之前我先一句话介绍一下什么是灰度发布。灰度发布就是,线上app无需停机就可以保证运行的是经过测试的稳定版本,且我们在冒烟测试时也不会影响到线上App的运行。

为什么我们要搞灰度发布

线上的服务每次都是我来构建,我可以非常负责任的讲,冒烟测试时不重新发布的几率很小,而且很多时候需要我去定位线上问题,这个过程我不知道你们痛不痛苦,反正我是很痛苦。换个角度分析,如果我是正在使用App的用户我会吐槽:这是什么牛马App,一天能卡个好几次,一次卡个几分钟,这还用个大锤子!为了能让用户有更好的体验,也为了我不在那么痛苦所以我们急需要一款灰度发布系统。

灰度发布系统怎么搞

一个很简单的理论,同时准备两份服务,让符合规则的请求路由到灰度接口,不符合规则的路由到之前发布的服务就好了哇~

灰度发布.svg

代码实现

熟悉SpringCloudGateway的同学对于gateway的路由配置不会很陌生,以下面的基础配置为例简单的讲解一下。

1
2
3
4
5
6
7
8
9
10
11
12
spring:
  cloud:
    gateway:
      default-filters:
        - DedupeResponseHeader=Access-Control-Allow-Credentials Access-Control-Allow-Origin
      routes:
        - id: web-server     # 路由的唯一ID,配合业务命名不重复即可
          uri: lb://web-server
          predicates: # 断言
            - Path=/web-api/api3/**  # 路径断言,路径匹配测进行路由
          filters:
            - RewritePath=/web-api/api/(?<segment>/?.*),/api/$\{segment}

当Gateway识别到请求符合某个断言后,就会将请求路由到该组断言对应的uri下。让符合规则的请求路由到灰度接口,不符合规则的路由到之前发布的服务就好了哇~,对于这个简单的需求我们只要在path断言不变的前提下在增加一个管理规则的断言不就可以了吗,以指定请求头包含指定value的规则为例,我们就可以自定义如下的断言。

自定义Gateway断言工厂

自定义Gateway的断言工厂那是相当的easy呀,你只需要复制我下面的代码修改下apply方法中规则为你想要的规则即可,你要是不信你也可以随便点进一个Gateway官方定义的任意一个断言工厂,官方的代码就是这么写的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import com.alibaba.nacos.api.utils.StringUtils;
import lombok.*;
import org.springframework.cloud.gateway.handler.predicate.AbstractRoutePredicateFactory;
import org.springframework.cloud.gateway.handler.predicate.GatewayPredicate;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;

/**
* @author jiangtongxue
* @date 2022/3/17 16:10
*/
@Component
public class HeaderUsernameRoutePredicateFactory extends AbstractRoutePredicateFactory<HeaderUsernameRoutePredicateFactory.Config> {
    
    public static final String USERNAME = "canaryFlag";
    
    public HeaderUsernameRoutePredicateFactory() {
        super(Config.class);
    }
    
    @Override
    public ShortcutType shortcutType() {
        return ShortcutType.GATHER_LIST;
    }
    
    @Override
    public List<String> shortcutFieldOrder() {
        return Collections.singletonList("canaryFlag");
    }
    
    @Override
    public Predicate<ServerWebExchange> apply(Config config) {
        List<String> usernames = Arrays.asList(config.getCanaryFlag().split(","));
        return new GatewayPredicate() {
            @Override
            public boolean test(ServerWebExchange serverWebExchange) {
                String username = serverWebExchange.getRequest().getHeaders().getFirst(USERNAME);
                if (!StringUtils.isBlank(username)) {
                    return usernames.contains(username);
                }
                return false;
            }
            
            @Override
            public String toString() {
                return String.format("Header: canaryFlag=%s", config.canaryFlag);
            }
        };
    }
    
    @NoArgsConstructor
    @Getter
    @Setter
    @ToString
    public static class Config {
        String canaryFlag;
    }
}

使用自定义的断言工厂

注意关注我上面自定义断言的类名,把RoutePredicateFactory后缀去掉就是断言的使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
  cloud:
    gateway:
      default-filters:
        - DedupeResponseHeader=Access-Control-Allow-Credentials Access-Control-Allow-Origin
      routes:
        - id: web-server   
          uri: lb://web-server
          predicates: # 断言
            - Path=/web-api/api3/** 
            - HeaderUsername=jiangtongxue,chuitongxue  #使用自定义的断言工厂
          filters:
            - RewritePath=/web-api/api/(?<segment>/?.*),/api/$\{segment}

有个小问题

如果是单机的小服务,我们可以修改url来映射到不同的接口,但是对于微服务的集群我们要怎么搞嘞。

自定义Gateway全局过滤器借助Nacos的元数据进行负载均衡

自定义全局过滤器也是very的easy呀,我们只需要新建一个类继承GlobalFilter和Ordered接口就好啦,getOrder返回的值越大,执行的越靠后。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.http.HttpHeaders;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
 * @author jiangtongxue
 * @date 2022/3/17 16:16
 */
@Slf4j
public class GrayRoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {

    private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
    private final String serviceId;
    private final AtomicInteger position;

    public GrayRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
        this.serviceId = serviceId;
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
        this.position = new AtomicInteger(new Random().nextInt(1000));
    }

    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        HttpHeaders headers = (HttpHeaders) request.getContext();
        ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
        return supplier.get(request).next().map(list -> getInstanceResponse(list, headers));
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, HttpHeaders headers) {
        List<ServiceInstance> serviceInstances = instances.stream()
                .filter(instance -> {
                    //根据请求头中的版本号信息,选取注册中心中的相应服务实例
                    String version = headers.getFirst("Version");
                    if (version != null) {
                        return version.equals(instance.getMetadata().get("version"));
                    } else {
                        return true;
                    }
                }).collect(Collectors.toList());
        if (instances.isEmpty()) {
            if (log.isWarnEnabled()) {
                log.warn("No servers available for service: " + serviceId);
            }
            return new EmptyResponse();
        }
        int pos = Math.abs(this.position.incrementAndGet());
        if (serviceInstances.size() == 0) {
            return  new EmptyResponse();
        }
        ServiceInstance instance = serviceInstances.get(pos % serviceInstances.size());
        return new DefaultResponse(instance);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultRequest;
import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.net.URI;

/**
* @author jiangtongxue
* @date 2022/3/17 16:18
*/
@Slf4j
@Component
public class GrayReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
    
    private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;
    
    private final LoadBalancerClientFactory clientFactory;
    
    public GrayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory) {
        this.clientFactory = clientFactory;
    }
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = (URI) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = (String) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
        if (url != null && ("gray-lb".equals(url.getScheme()) || "gray-lb".equals(schemePrefix))) {
            ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
            if (log.isTraceEnabled()) {
                log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
            }
            return this.choose(exchange).doOnNext((response) -> {
                if (!response.hasServer()) {
                    throw NotFoundException.create(true, "Unable to find instance for " + url.getHost());
                } else {
                    URI uri = exchange.getRequest().getURI();
                    String overrideScheme = null;
                    if (schemePrefix != null) {
                        overrideScheme = url.getScheme();
                    }
                    
                    DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance((ServiceInstance) response.getServer(), overrideScheme);
                    URI requestUrl = this.reconstructURI(serviceInstance, uri);
                    if (log.isTraceEnabled()) {
                        log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
                    }
                    
                    exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
                }
            }).then(chain.filter(exchange));
        } else {
            return chain.filter(exchange);
        }
    }
    
    private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
        URI uri = (URI) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        assert uri != null;
        GrayRoundRobinLoadBalancer loadBalancer = new GrayRoundRobinLoadBalancer(clientFactory.getLazyProvider(uri.getHost(), ServiceInstanceListSupplier.class), uri.getHost());
        return loadBalancer.choose(this.createRequest(exchange));
    }
    
    @SuppressWarnings("rawtypes")
    private Request createRequest(ServerWebExchange exchange) {
        HttpHeaders headers = exchange.getRequest().getHeaders();
        return new DefaultRequest<>(headers);
    }
    
    protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
        return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
    }
    
    @Override
    public int getOrder() {
        return LOAD_BALANCER_CLIENT_FILTER_ORDER;
    }
}

最终配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
spring:
  cloud:
    gateway:
      default-filters:
        - DedupeResponseHeader=Access-Control-Allow-Credentials Access-Control-Allow-Origin
      routes:
        - id: user-route-gray
          uri: gray-lb://pins-platform
          predicates:
            - HeaderUsername=jiangtongxue,chuitongxue  #使用自定义的断言工厂
            - Path=/gray/platform-api/**
          filters:
            - AddRequestHeader=Version,canary
            - RewritePath=/gray/platform-api/(?<segment>/?.*),/platform-api/$\{segment}
        - id: user-route
          uri: gray-lb://pins-platform
          predicates:
            - Path=/gray/platform-api/**
          filters:
            - AddRequestHeader=Version,release
            - RewritePath=/gray/platform-api/(?<segment>/?.*),/platform-api/$\{segment}

最后只要是请求中具有canaryFlag的请求头且值为jiangtongxue或者chuitongxue的请求就会路由到release版本的服务集群上~

img

坚持原创技术分享,您的支持将鼓励我继续创作!