松花皮蛋的黑板报
  • 分享在京东工作的技术感悟,还有JAVA技术和业内最佳实践,大部分都是务实的、能看懂的、可复现的

扫一扫
关注公众号

ARTS-11-Kafka中的恰好一次交付

博客首页文章列表 松花皮蛋me 2019-05-26 19:18

ARTS的初衷

Algorithm: 主要是为了编程训练和学习。

Review:主要是为了学习英文

Tip:主要是为了总结和归纳在是常工作中所遇到的知识点。学习至少一个技术技巧。在工作中遇到的问题,踩过的坑,学习的点滴知识。

Share:主要是为了建立影响力,能够输出价值观。分享一篇有观点和思考的技术文章

https://www.zhihu.com/question/301150832

一、Algorithm

Best Time to Buy and Sell Stock
Say you have an array for which the ith element is the price of a given stock on day i.

If you were only permitted to complete at most one transaction (i.e., buy one and sell one share of the stock), design an algorithm to find the maximum profit.

Note that you cannot sell a stock before you buy one.

Example 1:

Input: [7,1,5,3,6,4]
Output: 5
Explanation: Buy on day 2 (price = 1) and sell on day 5 (price = 6), profit = 6-1 = 5.
Not 7-1 = 6, as selling price needs to be larger than buying price.
Example 2:

Input: [7,6,4,3,1]
Output: 0
Explanation: In this case, no transaction is done, i.e. max profit = 0.

class Solution {
    public int maxProfit(int[] prices) {
        int min = Integer.MAX_VALUE;        
        int profit = 0;
        for(int i = 0; i < prices.length; i++) {
            profit = Math.max(prices[i]-min, profit);
            min = Math.min(prices[i], min);            
        }        
        return profit;
    }
}

二、Review

Exactly once Semantics are Possible: Here’s How Kafka Does it Kafka是如何做到消息不重复交付,不丢失,恰好发送一次的

Kafka支持三种交付语义,至少一次(At least once semantics)、至多一次(At most once semantics)、恰好发送一次(Exactly once semantics),这篇文章讲解了Kafka是如何实现第三种的

任何时候都可能发生Broker节点下线,针对这种情况,Kafka保证了每条消息被持久化并且冗余多份(作者按:建议约定topic的配置replication.factor参数值必须大于1,要求每个 partition必须有至少2个副本)。不过也有可能生产者到Broker节点的RPC失败(The producer-to-broker RPC can fail),如果此时消息已经写入Broker,但是还没来得及给生产者发送ACK确认就崩溃了,生产者无法感知原因,它只能等待一段时间后重试,最后导致消费者重复消费。另外一种情况就是客户端也可能会失败。所以在错综复杂的环境中,确保只交付一次是非常有挑战的工程问题

0.11.x解决了这个问题,重点是下面提到的三点

1、幂等性(Idempotence: Exactly once in order semantics per partition)

每批消息将包含一个序列号,通过序列号校验过滤重复消息,即使主节点挂了,重新选举的节点也能感知到是否有重复(this sequence number is persisted to the replicated log, so even if the leader fails, any broker that takes over will also know if a resend is a duplicate),可以通过设置在生产端配置”enable.idempotence=true”开启这个功能。作者按:建议约定服务端配置min.insync.replicas参数必须大于1,要求leader感知至少还有一个follower保持心跳连接

2、事务(Transactions: Atomic writes across multiple partitions)

需要保证跨多个Topic-Partition的数据要么全部写入成功,要么全部失败,不会出现中间状态,才能保证幂等性写入

producer.initTransactions();
try {
  producer.beginTransaction();
  producer.send(record1);
  producer.send(record2);
  producer.commitTransaction();
} catch(ProducerFencedException e) {
  producer.close();
} catch(KafkaException e) {
  producer.abortTransaction();
}

需要注意的是,消费者可能读取到的是中间状态,不过可以配置参数isolation.level=read_committed,表示事务提交后可获取,另外一个可选值是read_uncommitted,表示不等待事务提交,正常通过offset order读取

3、流处理(The real deal: Exactly once stream processing in Apache Kafka)

数据处理失败后,还能访问到原始输入数据,然后再执行处理操作,也就是说保证输入数据源的可依赖性才能保证恰好一次正确交付。Kafka是通过将数据透明地fold起来合并成原子性操作以事务的形式写入多个分区,如果失败就回滚关联状态,不需要重新设置offset,整个过程不涉及原始数据获取的幂等性操作

三、Tip

本周分享下Mac的Dash、Alfred应用吧。Dash是一个API文档浏览器(API Documentation Browser),以及代码片段管理工具(Code Snippet Manager),Alfred是一个可插拔的编排Workflow工具。当他们结合起来会发生很奇妙的反应,输入Dash上Snippets的标题可快速显示其内容,从此在Linux上输命令快如疾风

四、Share

谈谈利用策略模式优化过多的if else代码,拿我之前的一个项目(域名申请)开涮,一期需求大致如下

  1. 1、支持正式环境的内网服务间调用域名,如jd.local
  2. 2、支持正式环境的对外服务域名,包括公网和内网,如jd.com
  3. 3、支持预发环境的内网服务间调用,如jd.care
  4. 4、支持预发环境的对外服务,如beta-*.jd.com
  5. 5、支持测试环境的对外服务域名
  6. 6、支持测试环境的服务间调用域名
  7. 7、支持公网域名(jd.com)生成内网(jd.local)域名
  8. 8、支持预发对外服务(beta-)生成内网对外服务(jd.com)域名
  9. 9、支持预发对外服务(beta-)生成公网对外服务(jd.com)域名
  10. 10、支持预发服务间调用(jd.care)生成正式的服务间调用(jd.local)域名

一些约定:

  1. 1、正式环境的解析走IPV4-A或者IPV6-AAAA记录,需生成VIP
  2. 2、预发环境的解析走Proxy-Cname代理
  3. 3、测试环境的解析直连后端IP
  4. 4、内网和公网的机房类型不兼容,对外服务和服务间调用的机房类型不兼容
  5. 5、针对自动生成域名类,后端集群需一致,也就是说VIP流量打到相同的upstream上

如果你一开始用if else完成这些需求,突然有一天新增CDN域名解析等等业务时…….老板我要离职,这项目坑太多了。如果使用策略模式完成这个项目,它应该长下面这个样子

入口,这里省去校验等等逻辑

 InterfaceNpSox instance = NpSoxContext.getInstance(npDomainSox, ticket, opsErp, appInfo);
 instance.process(npDomainSox);

环境(Context)角色,动态获取实现类

public class NpSoxContext {

    public static InterfaceNpSox getInstance(NpDomainSox npDomainSox, String ticket, String opsErp, Map<String,String> appInfo)
    {
        NpSoxEnum[] values = NpSoxEnum.values();
        Integer type = npDomainSox.getType();
        for (NpSoxEnum npSoxEnum:values) {
           if(npSoxEnum.getType()==type) {
               return (InterfaceNpSox) SpringContextUtil.getBean(Class.forName(npSoxEnum.getNpSoxApply()));
           }
        }
        return new ErrorSoxApply();
    }
}

映射枚举(Enum)角色。策略模式要求客户端必须知道算法或者行为的区别,以便适时选择恰当的实现类,这里显式维护了申请类型和实现类型对应关系。生成接入可以复用标准接入,所以我只声明了两种大类型,不然备选策略数目太多了

public enum NpSoxEnum {


    D_LOCAL(1,1,"正式-服务间调用local","com.front.ops.soa.NpSox.StandardSoxApply"),
    D_COM (2,2,"正式-对外服务","com.front.ops.soa.NpSox.StandardSoxApply"),

    D_CARE(3,1,"预发-服务间调用care","com.front.ops.soa.NpSox.StandardSoxApply"),
    D_BETA_COM(4,2,"预发-对外服务","com.front.ops.soa.NpSox..StandardSoxApply"),

    D_DEV_LOCAL(5,1,"测试-服务间调用","com.front.ops.soa.NpSox.StandardSoxApply"),
    D_DEV_COM(6,2,"测试-对外服务","com.front.ops.soa.NpSox.StandardSoxApply"),

    DG_COM_LOCAL(7,0,"公网生成local","com.front.ops.soa.NpSox.GenerateSoxApply"),
    DG_BETA_COM(8,0,"beta自动生成内网com","com.front.ops.soa.NpSox.GenerateSoxApply"),

    DG_CARE_LOCAL(9,0,"care生成local","com.front.ops.soa.NpSox.GenerateSoxApply"),
    DG_BETA_OUT_COM(10,0,"beta域名生成公网com","com.front.ops.soa.NpSox.GenerateSoxApply");

    //申请页面展示的类型
    private final Integer type;
    //上游约定的服务类型,1:服务间调用 2:对外服务
    private final Integer realType;
    private final String description;
    private  final String npSoxApply;


    NpSoxEnum(Integer t,Integer realT,String desc,String soxApply){
            type = t;
            realType =  realT;
            description = desc;
            npSoxApply = soxApply;
    }

    public Integer getType() {
        return type;
    }

    public String getDescription() {
        return description;
    }

    public String getNpSoxApply() {
        return npSoxApply;
    }

    public Integer getRealType() {
        return realType;
    }
}

接口策略角色

public interface InterfaceNpSox {

    /**
     * @Description:  提单逻辑
     * @Param: [npDomainSox]
     * @return: void
     * @Author: Pidan
     */
     void process(NpDomainSox npDomainSox);

}

不过这些需求有很多共同点,比如自动生成的需求可能需要重新获取机房、可能需要重新设置集群名,那么干脆声明个抽象类,另外把针对生成类型的PoJo转换也放到抽象类中,方便重用。不过说实话,我看过几个其他组项目的代码,使用抽象类的不多,网上也有一种声音,接口这么强大,为啥还要抽象类呢?如果你看过SpringBoot的源码的话,就会发现,抽象类ApplicationContext对整套接口提供了大部分的默认实现,将其中“不易变动”的部分进行了封装,通过“组合”的方式将“容易变动”的功能委托给其他类来实现,同时利用模板方法模式将一些方法的实现开放出去由子类实现,从而实现“对扩展开放,对修改封闭”的设计原则,开源项目之所以伟大是有道理的

抽象策略角色

public abstract class AbstractNpSox  implements InterfaceNpSox {



    /** 
    * @Description: 获取后端实例,不管什么类型,ips都不会变的
    * @Param: [npDomainSox] 
    * @return: java.util.List<java.lang.String> 
    * @Author: Pidan
    */
    public List<String> getIps()
    {

    }

    /** 
    * @Description: 获取机房
    * 针对生成类型,可能需要获取机房
    * 另外跨网络类型时必须显式清空机房字段,此外审批时选择的机房优先
    * @Param: [npDomainSox] 
    * @return: java.util.List<java.lang.String> 
    * @Author: Pidan
    */
    public List<String> getIdcs(NpDomainSox npDomainSox)
    {

    }


    /** 
    * @Description: 申请单类型和上游约定的类型进行转换
    * @Param: [] 
    * @return: int 
    * @Author: Pidan
    */
    public int getType(NpDomainSox npDomainSox)
    {

    }

    //省去针对生成类型的Pojo转换,易变的也留给子类去实现
    //省去字段的SET\GET方法

}

整体比if-else更加灵活,但是设计模式有时候是一个坑,不要生硬的去套用模式,为了模式而模式。希望这篇文章能帮到你,欢迎分享给朋友阅读