0xTTEPX

Just do it, deeply...

Follow me on GitHub

incubator-gossip框架设计

write by valuewithtime, 2020-10-20 14:30
p2p

引言

Gossip protocol 也叫 Epidemic Protocol (流行病协议),实际上它还有很多别名,比如:“流言算法”、“疫情传播算法”等。很多知名的 P2P 网络或区块链项目,比如 IPFS,Ethereum 等,都使用了 Kadmelia 算法,而大名鼎鼎的 Bitcoin 则是使用了 Gossip 协议来传播交易和区块信息。实际上,只要仔细分析一下场景就知道,Ethereum 使用 DHT 算法并不是很合理,因为它使用节点保存整个链数据,不像 IPFS 那样分片保存数据,因此 Ethereum 真正适合的协议应该像 Bitcoin 那样,是 Gossip 协议。

目录

gossip初探

关于Gossip 协议介绍,我们不做太多的讲解,因为网上已经有很多的文章,可以参考附录文献。在了解gossip一些的基础之上我们来探索gossip相关的协议实现框架,本文研究的框架为 incubator-retired-gossip

incubator-retired-gossip框架提供了gossip协议的单机版示例,包括族的管理,基于CRDT的共享数据,PN计数器,数据中心,分布式锁的相关demo;

具体参见使用说明 gossip-examples

把demo跑一篇,我们再来看gossip的框架设计。

框架设计

gossip-framework

gossip是基于UDP协议进行通信的,底层是通过UDP管理器进行消息的收发;消息编解码器使用的是JSON序列化方式,节点接收消息后,有解码器进行解码,委托消息处理器处理,消息处理器有节点数据、共享数据、及宕机消息等。集群中的节点的状态同步通过节点状态刷新器进行同步。针对族活跃节点、宕机节点、节点数据,及共享数据的同步通过数据同步器完成。core组件对节点、共享数据进行管理,及消息的收发。gossip管理器持久节点状态、数据及共享数据。

源码分析

UDP管理器

/**
 * This class is constructed by reflection in GossipManager.
 * It manages transport (byte read/write) operations over UDP.
 * 管理基于UDP的数据传输操作
 */
public class UdpTransportManager extends AbstractTransportManager implements Runnable {
  public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class);
  /**
   *
   * The socket used for the passive thread of the gossip service.
   * gossip socket server
   *  */
  private final DatagramSocket server;
  /**
   * 获取数据超时时间
   */
  private final int soTimeout;
  
  private final Thread me;
  /**
   * 运行状态
   */
  private final AtomicBoolean keepRunning = new AtomicBoolean(true);
  
  /**
   * required for reflection to work!
   * @param gossipManager
   * @param gossipCore*/
  public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
    super(gossipManager, gossipCore);
    soTimeout = gossipManager.getSettings().getGossipInterval() * 2;
    try {
      SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
              gossipManager.getMyself().getUri().getPort());
      server = new DatagramSocket(socketAddress);
    } catch (SocketException ex) {
      LOGGER.warn(ex);
      throw new RuntimeException(ex);
    }
    me = new Thread(this);
  }

  @Override
  public void run() {
    while (keepRunning.get()) {
      try {
        byte[] buf = read();
        try {
          //读取数据
          Base message = gossipManager.getProtocolManager().read(buf);
          //处理消息
          gossipCore.receive(message);
          //TODO this is suspect  GossipMemberStateRefresher
          gossipManager.getMemberStateRefresher().run();
        } catch (RuntimeException ex) {//TODO trap json exception
          LOGGER.error("Unable to process message", ex);
        }
      } catch (IOException e) {
        // InterruptedException are completely normal here because of the blocking lifecycle.
        if (!(e.getCause() instanceof InterruptedException)) {
          LOGGER.error(e);
        }
        keepRunning.set(false);
      }
    }
  }
  
  @Override
  public void shutdown() {
    keepRunning.set(false);
    server.close();
    super.shutdown();
    me.interrupt();
  }

  /**
   * blocking read a message.
   * 从缓存区读取消息数据
   * @return buffer of message contents.
   * @throws IOException
   */
  public byte[] read() throws IOException {
    byte[] buf = new byte[server.getReceiveBufferSize()];
    DatagramPacket p = new DatagramPacket(buf, buf.length);
    server.receive(p);
    debug(p.getData());
    return p.getData();
  }

  /**
   * 发送字节数据
   * @param endpoint
   * @param buf
   * @throws IOException
   */
  @Override
  public void send(URI endpoint, byte[] buf) throws IOException {
    // todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket.
    try (DatagramSocket socket = new DatagramSocket()){
      socket.setSoTimeout(soTimeout);
      InetAddress dest = InetAddress.getByName(endpoint.getHost());
      DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort());
      socket.send(payload);
    }
  }
  

  /**
   * 启动endpoint
   */
  @Override
  public void startEndpoint() {
    me.start();
  }
  
}

从上面可以看出,UDP传输管理UdpTransportManager,主要发送字节数据,接收字节数据,并委托给core组件。在UDP传输管理器启动的过程中,同时启动节点状态刷新器。

我们再来看一下消息编解码器

消息编解码

// this class is constructed by reflection in GossipManager.
public class JacksonProtocolManager implements ProtocolManager {
  
  private final ObjectMapper objectMapper;
  private final PrivateKey privKey;
  private final Meter signed;
  private final Meter unsigned;
  
  /** required for reflection to work!
   * @param settings
   * @param id
   * @param registry*/
  public JacksonProtocolManager(GossipSettings settings, String id, MetricRegistry registry) {
    // set up object mapper.
    objectMapper = buildObjectMapper(settings);
    
    // set up message signing. 如果需要加签消息,则加载节点公私钥
    if (settings.isSignMessages()){
      File privateKey = new File(settings.getPathToKeyStore(), id);
      File publicKey = new File(settings.getPathToKeyStore(), id + ".pub");
      if (!privateKey.exists()){
        throw new IllegalArgumentException("private key not found " + privateKey);
      }
      if (!publicKey.exists()){
        throw new IllegalArgumentException("public key not found " + publicKey);
      }
      try (FileInputStream keyfis = new FileInputStream(privateKey)) {
        byte[] encKey = new byte[keyfis.available()];
        keyfis.read(encKey);
        keyfis.close();
        PKCS8EncodedKeySpec privKeySpec = new PKCS8EncodedKeySpec(encKey);
        KeyFactory keyFactory = KeyFactory.getInstance("DSA");
        privKey = keyFactory.generatePrivate(privKeySpec);
      } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) {
        throw new RuntimeException("failed hard", e);
      }
    } else {
      privKey = null;
    }
    
    signed = registry.meter(PassiveGossipConstants.SIGNED_MESSAGE);
    unsigned = registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
  }

  /**
   * 转换消息为字节数组
   * @param message
   * @return
   * @throws IOException
   */
  @Override
  public byte[] write(Base message) throws IOException {
    byte[] json_bytes;
    if (privKey == null){
      json_bytes = objectMapper.writeValueAsBytes(message);
    } else {
      SignedPayload p = new SignedPayload();
      p.setData(objectMapper.writeValueAsString(message).getBytes());
      p.setSignature(sign(p.getData(), privKey));
      json_bytes = objectMapper.writeValueAsBytes(p);
    }
    return json_bytes;
  }

  /**
   * 转换字节数组为消息对象
   * @param buf
   * @return
   * @throws IOException
   */
  @Override
  public Base read(byte[] buf) throws IOException {
    Base activeGossipMessage = objectMapper.readValue(buf, Base.class);
    if (activeGossipMessage instanceof SignedPayload){
      SignedPayload s = (SignedPayload) activeGossipMessage;
      signed.mark();
      return objectMapper.readValue(s.getData(), Base.class);
    } else {
      unsigned.mark();
      return activeGossipMessage;
    }
  }

  public static ObjectMapper buildObjectMapper(GossipSettings settings) {
    ObjectMapper om = new ObjectMapper();
    om.enableDefaultTyping();
    // todo: should be specified in the configuration.
    om.registerModule(new CrdtModule());
    om.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
    return om;
  }
  
  private static byte[] sign(byte [] bytes, PrivateKey pk){
    Signature dsa;
    try {
      dsa = Signature.getInstance("SHA1withDSA", "SUN");
      dsa.initSign(pk);
      dsa.update(bytes);
      return dsa.sign();
    } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) {
      throw new RuntimeException(e);
    } 
  }
}

从上面可以看出,消息编解码器JacksonProtocolManager是基于JSON的消息编解码,如果是RSA加签方式,则在收发消息,做响应的加解密。

消息处理器

消息处理这块,我们来看一下节点数据、共享数据、节点宕机消息的处理。

  • 共享数据消息处理器
public class SharedDataMessageHandler implements MessageHandler{
  
  /**
   * @param gossipCore context.
   * @param gossipManager context.
   * @param base message reference.
   * @return boolean indicating success.
   */
  @Override
  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
    UdpSharedDataMessage message = (UdpSharedDataMessage) base;
    //添加共享数据
    gossipCore.addSharedData(message);
    return true;
  }
}

  • 节点数据消息处理器
public class PerNodeDataMessageHandler implements MessageHandler {

  /**
   * @param gossipCore context.
   * @param gossipManager context.
   * @param base message reference.
   * @return boolean indicating success.
   */
  @Override
  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
    UdpPerNodeDataMessage message = (UdpPerNodeDataMessage) base;
    //添加节点数据消息
    gossipCore.addPerNodeData(message);
    return true;
  }
}
  • 节点宕机消息处理器
public class ShutdownMessageHandler implements MessageHandler {
  
  /**
   * @param gossipCore context.
   * @param gossipManager context.
   * @param base message reference.
   * @return boolean indicating success.
   */
  @Override
  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
    ShutdownMessage s = (ShutdownMessage) base;
    //转换宕机下消息,为节点数据消息
    PerNodeDataMessage m = new PerNodeDataMessage();
    m.setKey(ShutdownMessage.PER_NODE_KEY);
    m.setNodeId(s.getNodeId());
    m.setPayload(base);
    m.setTimestamp(System.currentTimeMillis());
    m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
    //添加节点数据消息
    gossipCore.addPerNodeData(m);
    return true;
  }
}

从上面可以共享数据和节点数据处理实际是委托给core组件,这个我们后面再来看,宕机消息转换为节点消息进行处理。

节点状态刷新器

/**
 * 成员状态刷新线程
 */
public class GossipMemberStateRefresher {
  public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);

  /**
   * gossip成员
   */
  private final Map<LocalMember, GossipState> members;
  /**
   * gossip配置
   */
  private final GossipSettings settings;
  /**
   * gossip监听器
   */
  private final List<GossipListener> listeners = new CopyOnWriteArrayList<>();
  /**
   * 系统时钟
   */
  private final Clock clock;
  private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
  /**
   * 监听器执行线程
   */
  private final ExecutorService listenerExecutor;
  /**
   * 条赌气
   */
  private final ScheduledExecutorService scheduledExecutor;
  /**
   * 任务队列
   */
  private final BlockingQueue<Runnable> workQueue;

  public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings,
                                    GossipListener listener,
                                    BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
    this.members = members;
    this.settings = settings;
    listeners.add(listener);
    this.findPerNodeGossipData = findPerNodeGossipData;
    clock = new SystemClock();
    workQueue = new ArrayBlockingQueue<>(1024);
    listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue,
            new ThreadPoolExecutor.DiscardOldestPolicy());
    scheduledExecutor = Executors.newScheduledThreadPool(1);
  }

  /**
   * 调度成员状态刷新器
   */
  public void init() {
    scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS);
  }

  /**
   *
   */
  public void run() {
    try {
      runOnce();
    } catch (RuntimeException ex) {
      LOGGER.warn("scheduled state had exception", ex);
    }
  }

  /**
   * gossip成员状态探测
   */
  public void runOnce() {
    for (Entry<LocalMember, GossipState> entry : members.entrySet()) {
      boolean userDown = processOptimisticShutdown(entry);
      if (userDown)
        continue;

      Double phiMeasure = entry.getKey().detect(clock.nanoTime());
      GossipState requiredState;
      //根据探测结果,判断节点状态
      if (phiMeasure != null) {
        requiredState = calcRequiredState(phiMeasure);
      } else {
        requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue());
      }

      if (entry.getValue() != requiredState) {
        members.put(entry.getKey(), requiredState);
        /* Call listeners asynchronously 异步触发节点状态监听器*/
        for (GossipListener listener: listeners)
          listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState));
      }
    }
  }

  /**
   *
   * @param phiMeasure
   * @return
   */
  public GossipState calcRequiredState(Double phiMeasure) {
    //如果探测节点存活的时间间隔大于阈值,则节点down
    if (phiMeasure > settings.getConvictThreshold())
      return GossipState.DOWN;
    else
      return GossipState.UP;
  }

  /**
   * 首次探活状态
   * @param member
   * @param state
   * @return
   */
  public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) {
    long now = clock.nanoTime();
    long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
    //如果当前时间减去清理时间间隔,大于成员的心跳时间,则DOWN
    if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) {
      return GossipState.DOWN;
    } else {
      return state;
    }
  }

  /**
   * If we have a special key the per-node data that means that the node has sent us
   * a pre-emptive shutdown message. We process this so node is seen down sooner
   * 预处理节点状态
   * @param l member to consider
   * @return true if node forced down
   */
  public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) {
    //获取节点宕机消息
    PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
    if (m == null) {
      return false;
    }
    ShutdownMessage s = (ShutdownMessage) m.getPayload();
    //如果节点宕机时间大于当前心跳时间
    if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {
      members.put(l.getKey(), GossipState.DOWN);
      if (l.getValue() == GossipState.UP) {
        //如果状态变化,则通知监听器
        for (GossipListener listener: listeners)
          listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN));
      }
      return true;
    }
    return false;
  }

  /**
   * 注解监听器
   * @param listener
   */
  public void register(GossipListener listener) {
    listeners.add(listener);
  }

  /**
   * 关闭gossip成员刷新器
   */
  public void shutdown() {
    scheduledExecutor.shutdown();
    try {
      scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
    listenerExecutor.shutdown();
    try {
      listenerExecutor.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
    listenerExecutor.shutdownNow();
  }
}

节点状态刷新器GossipMemberStateRefresher,通过宕机探测器FailureDetector,探测节点的存活状态,并更新节点状态,针对状态变更的,要通知响应的监听器。

数据同步器

/**
 * Base implementation gossips randomly to live nodes periodically gossips to dead ones
 *
 */
public class SimpleActiveGossiper extends AbstractActiveGossiper {

  /**
   * 调度器
   */
  private ScheduledExecutorService scheduledExecutorService;
  /**
   * 任务队列
   */
  private final BlockingQueue<Runnable> workQueue;
  /**
   * 线程池执行器
   */
  private ThreadPoolExecutor threadService;

  /**
   * @param gossipManager
   * @param gossipCore
   * @param registry
   */
  public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
                              MetricRegistry registry) {
    super(gossipManager, gossipCore, registry);
    scheduledExecutorService = Executors.newScheduledThreadPool(2);
    workQueue = new ArrayBlockingQueue<Runnable>(1024);
    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
            new ThreadPoolExecutor.DiscardOldestPolicy());
  }

  @Override
  public void init() {
    super.init();
    scheduledExecutorService.scheduleAtFixedRate(() -> {
      threadService.execute(() -> {
        //发送Live成员
        sendToALiveMember();
      });
    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(() -> {
      //发送Dead成员
      sendToDeadMember();
    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(
            //发送节点数据
            () -> sendPerNodeData(gossipManager.getMyself(),
                    selectPartner(gossipManager.getLiveMembers())),
            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(
            //发送共享数据
            () -> sendSharedData(gossipManager.getMyself(),
                    selectPartner(gossipManager.getLiveMembers())),
            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
  }
  
  @Override
  public void shutdown() {
    super.shutdown();
    scheduledExecutorService.shutdown();
    try {
      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
    //发送关闭消息
    sendShutdownMessage();
    threadService.shutdown();
    try {
      threadService.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
  }

  /**
   *从当前节点的gossip成员列表中选择一个成员,发送Live节点信息
   */
  protected void sendToALiveMember(){
    LocalMember member = selectPartner(gossipManager.getLiveMembers());
    sendMembershipList(gossipManager.getMyself(), member);
  }

  /**
   * 从当前节点的gossip成员列表中选择一个成员,发送Dead节点信息
   */
  protected void sendToDeadMember(){
    LocalMember member = selectPartner(gossipManager.getDeadMembers());
    sendMembershipList(gossipManager.getMyself(), member);
  }
  
  /**
   * sends an optimistic shutdown message to several clusters nodes
   * 通知族节点,当前节点已关闭
   */
  protected void sendShutdownMessage(){
    List<LocalMember> l = gossipManager.getLiveMembers();
    int sendTo = l.size() < 3 ? 1 : l.size() / 2;
    for (int i = 0; i < sendTo; i++) {
      threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
    }
  }
}

数据同步器,同步活跃、宕机成员,节点数据、共享数据。

我们再来看同步活跃节点

//SimpleActiveGossiper

 /**
   *从当前节点的gossip成员列表中选择一个成员,发送Live节点信息
   */
  protected void sendToALiveMember(){
    LocalMember member = selectPartner(gossipManager.getLiveMembers());
    sendMembershipList(gossipManager.getMyself(), member);
  }

//AbstractActiveGossiper

 /**
   * 返回本地的随机gossip成员
   * @param memberList
   *          An immutable list
   * @return The chosen LocalGossipMember to gossip with.
   */
  protected LocalMember selectPartner(List<LocalMember> memberList) {
    LocalMember member = null;
    if (memberList.size() > 0) {
      int randomNeighborIndex = random.nextInt(memberList.size());
      member = memberList.get(randomNeighborIndex);
    }
    return member;
  }
 /**
   * Performs the sending of the membership list, after we have incremented our own heartbeat.
   * 在自增心跳之后,发送成员列表
   */
  protected void sendMembershipList(LocalMember me, LocalMember member) {
    if (member == null){
      return;
    }
    long startTime = System.currentTimeMillis();
    me.setHeartbeat(System.nanoTime());
    //保活gossip消息
    UdpActiveGossipMessage message = new UdpActiveGossipMessage();
    message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
    message.setUuid(UUID.randomUUID().toString());
    message.getMembers().add(convert(me));
    for (LocalMember other : gossipManager.getMembers().keySet()) {
      message.getMembers().add(convert(other));
    }
    Response r = gossipCore.send(message, member.getUri());
    if (r instanceof ActiveGossipOk){
      //maybe count metrics here
    } else {
      LOGGER.debug("Message " + message + " generated response " + r);
    }
    sendMembershipHistogram.update(System.currentTimeMillis() - startTime);
  }

发送活跃节点消息,实际随机从活跃节点的信息,将当前节点的活跃节点信息包装保活gossip消息发送给筛选出的节点。发送宕机节点的原理一致。发送消息委托给core组件。

再来看发送共享数据

//SimpleActiveGossiper

 @Override
  public void init() {
    super.init();
    scheduledExecutorService.scheduleAtFixedRate(() -> {
      threadService.execute(() -> {
        //发送Live成员
        sendToALiveMember();
      });
    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(() -> {
      //发送Dead成员
      sendToDeadMember();
    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(
            //发送节点数据
            () -> sendPerNodeData(gossipManager.getMyself(),
                    selectPartner(gossipManager.getLiveMembers())),
            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(
            //发送共享数据
            () -> sendSharedData(gossipManager.getMyself(),
                    selectPartner(gossipManager.getLiveMembers())),
            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
  }

//AbstractActiveGossiper

 /**
   * 发送共享该数据
   * @param me
   * @param member
   */
  public final void sendSharedData(LocalMember me, LocalMember member) {
    if (member == null) {
      return;
    }
    long startTime = System.currentTimeMillis();
    if (gossipSettings.isBulkTransfer()) {
      sendSharedDataInBulkInternal(me, member);
    } else {
      sendSharedDataInternal(me, member);
    }
    sharedDataHistogram.update(System.currentTimeMillis() - startTime);
  }

 /**
   * Send shared data one entry at a time.
   * 发送共享数据
   * @param me
   * @param member
   */
  private void sendSharedDataInternal(LocalMember me, LocalMember member) {
    for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
      if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
              .shouldReplicate(me, member, innerEntry.getValue())) {
        //不可复制,则直接do nothing
        continue;
      }
      //拷贝共享数据
      UdpSharedDataMessage message = new UdpSharedDataMessage();
      message.setUuid(UUID.randomUUID().toString());
      message.setUriFrom(me.getId());
      copySharedDataMessage(innerEntry.getValue(), message);
      gossipCore.sendOneWay(message, member.getUri());
    }
  } 

发送共享数据实际为通过core组件将共享数据保证成UdpSharedDataMessage并发送给peer节点。节点数据原理一致,只不过发送的是节点数据。

core组件

public class GossipCore implements GossipCoreConstants {

  class LatchAndBase {
    /**
     * 请求锁
     */
    private final CountDownLatch latch;
    /**
     * 消息
     */
    private volatile Base base;
    
    LatchAndBase(){
      latch = new CountDownLatch(1);
    }
    
  }
  public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
  private final GossipManager gossipManager;
  /**
   * 可追溯的请求
   */
  private ConcurrentHashMap<String, LatchAndBase> requests;
  /**
   * 每个节点的数据
   */
  private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
  /**
   * 共享数据集
   */
  private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
  private final Meter messageSerdeException;
  /**
   * 传输异常度量器
   */
  private final Meter transmissionException;
  /**
   * 传输成功度量器
   */
  private final Meter transmissionSuccess;
  private final DataEventManager eventManager;
  
  public GossipCore(GossipManager manager, MetricRegistry metrics){
    this.gossipManager = manager;
    requests = new ConcurrentHashMap<>();
    perNodeData = new ConcurrentHashMap<>();
    sharedData = new ConcurrentHashMap<>();
    eventManager = new DataEventManager(metrics);
    metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
    metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() ->  sharedData.size());
    metrics.register(REQUEST_SIZE, (Gauge<Integer>)() ->  requests.size());
    messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
    transmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
    transmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
  }
  ...
}

//GossipCore

 /**
   * 接受消息
   * @param base
   */
  public void receive(Base base) {
    if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) {
      LOGGER.warn("received message can not be handled");
    }
  }

//SharedDataMessageHandler

public class SharedDataMessageHandler implements MessageHandler{
  
  /**
   * @param gossipCore context.
   * @param gossipManager context.
   * @param base message reference.
   * @return boolean indicating success.
   */
  @Override
  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
    UdpSharedDataMessage message = (UdpSharedDataMessage) base;
    //添加共享数据
    gossipCore.addSharedData(message);
    return true;
  }
}

//GossipCore

 /**
   * 添加共享数据
   * @param message
   */
  @SuppressWarnings({ "unchecked", "rawtypes" })
  public void addSharedData(SharedDataMessage message) {
    while (true){
      SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
      if (previous == null){
        eventManager.notifySharedData(message.getKey(), message.getPayload(), null);
        return;
      }
      if (message.getPayload() instanceof Crdt){
        //合并共享数据
        SharedDataMessage merged = new SharedDataMessage();
        merged.setExpireAt(message.getExpireAt());
        merged.setKey(message.getKey());
        merged.setNodeId(message.getNodeId());
        merged.setTimestamp(message.getTimestamp());
        Crdt mergedCrdt = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload());
        merged.setPayload(mergedCrdt);
        boolean replaced = sharedData.replace(message.getKey(), previous, merged);
        if (replaced){
          if(!merged.getPayload().equals(previous.getPayload())) {
            eventManager
                    .notifySharedData(message.getKey(), merged.getPayload(), previous.getPayload());
          }
          return;
        }
      } else {
        //非CRDT数据,已最新数据为准
        if (previous.getTimestamp() < message.getTimestamp()){
          boolean result = sharedData.replace(message.getKey(), previous, message);
          if (result){
            eventManager.notifySharedData(message.getKey(), message.getPayload(), previous.getPayload());
            return;
          }
        } else {
          return;
        }
      }
    }
  }

peer节点接收消息后,调用消息处理器,处理消息,针对共享数据,如果基于CRDT的数据则合并,否则更新,通知事件处理器;针对节点数据基本相同。

再来看发送数据

//GossipCore

/**
   * @param message
   * @param uri
   * @return
   */
  public Response send(Base message, URI uri){
    if (LOGGER.isDebugEnabled()){
      LOGGER.debug("Sending " + message);
      LOGGER.debug("Current request queue " + requests);
    }

    final Trackable t;
    LatchAndBase latchAndBase = null;
    if (message instanceof Trackable){
      t = (Trackable) message;
      latchAndBase = new LatchAndBase();
      //放入请求集
      requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);
    } else {
      t = null;
    }
    sendInternal(message, uri);
    if (latchAndBase == null){
      return null;
    }
    
    try {

      boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
      if (complete){
        //等待结果
        return (Response) latchAndBase.base;
      } else{
        return null;
      }
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    } finally {
      if (latchAndBase != null){
        //移除高清球
        requests.remove(t.getUuid() + "/" + t.getUriFrom());
      }
    }
  }

  /**
   * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used
   * when the protocol for the message is not to wait for a response
   * 发送网络消息。忽略,并捕捉传输的io异常。一般用于不需要回复的协议消息
   * @param message the message to send
   * @param u the uri to send it to
   */
  public void sendOneWay(Base message, URI u) {
    try {
      sendInternal(message, u);
    } catch (RuntimeException ex) {
      LOGGER.debug("Send one way failed", ex);
    }
  }
   /**
   * Sends a blocking message.
   * todo: move functionality to TransportManager layer.
   * @param message
   * @param uri
   * @throws RuntimeException if data can not be serialized or in transmission error
   */
  private void sendInternal(Base message, URI uri) {
    byte[] json_bytes;
    try {
      json_bytes = gossipManager.getProtocolManager().write(message);
    } catch (IOException e) {
      messageSerdeException.mark();
      throw new RuntimeException(e);
    }
    try {
      gossipManager.getTransportManager().send(uri, json_bytes);
      transmissionSuccess.mark();
    } catch (IOException e) {
      transmissionException.mark();
      throw new RuntimeException(e);
    }
  }

core组件发送消息实际是委托底层的UDP传输管理器进行传输。

gossip管理器

public abstract class GossipManager {

  public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
  
  // this mapper is used for ring and user-data persistence only. NOT messages.
  public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {
    private static final long serialVersionUID = 1L;
  {
    enableDefaultTyping();
    configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
  }};

  /**
   * gossip 成员
   */
  private final ConcurrentSkipListMap<LocalMember, GossipState> members;
  /**
   * 当前节点信息
   */
  private final LocalMember me;
  /**
   * gossip协议配置
   */
  private final GossipSettings settings;
  /**
   * gossip服务运行状态
   */
  private final AtomicBoolean gossipServiceRunning;

  /**
   * gossip 节点管理
   */
  private TransportManager transportManager;
  /**
   * 协议管理器
   */
  private ProtocolManager protocolManager;
  
  private final GossipCore gossipCore;
  private final DataReaper dataReaper;
  /**
   * 时钟
   */
  private final Clock clock;
  /**
   * 调度器
   */
  private final ScheduledExecutorService scheduledServiced;
  private final MetricRegistry registry;
  /**
   * 节点gossip成员持久器
   */
  private final RingStatePersister ringState;
  /**
   * 用户数据持久器
   */
  private final UserDataPersister userDataState;
  /**
   * 成员状态刷新器
   */
  private final GossipMemberStateRefresher memberStateRefresher;

  /**
   * 消息处理器
   */
  private final MessageHandler messageHandler;
  private final LockManager lockManager;

  /**
   * @param cluster
   * @param uri
   * @param id
   * @param properties  成员属性
   * @param settings
   * @param gossipMembers
   * @param listener
   * @param registry
   * @param messageHandler
   */
  public GossipManager(String cluster,
                       URI uri, String id, Map<String, String> properties, GossipSettings settings,
                       List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
                       MessageHandler messageHandler) {
    this.settings = settings;
    this.messageHandler = messageHandler;
    //创建系统时钟
    clock = new SystemClock();
    //本地gossip成员
    me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
            settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
    gossipCore = new GossipCore(this, registry);
    this.lockManager = new LockManager(this, settings.getLockManagerSettings(), registry);
    dataReaper = new DataReaper(gossipCore, clock);
    members = new ConcurrentSkipListMap<>();
    for (Member startupMember : gossipMembers) {
      if (!startupMember.equals(me)) {
        LocalMember member = new LocalMember(startupMember.getClusterName(),
                startupMember.getUri(), startupMember.getId(),
                clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
                settings.getMinimumSamples(), settings.getDistribution());
        //TODO should members start in down state?
        members.put(member, GossipState.DOWN);
      }
    }
    gossipServiceRunning = new AtomicBoolean(true);
    this.scheduledServiced = Executors.newScheduledThreadPool(1);
    this.registry = registry;
    //gossip成员持久器
    this.ringState = new RingStatePersister(GossipManager.buildRingStatePath(this), this);
    //用户数据持久器
    this.userDataState = new UserDataPersister(
        gossipCore,
        GossipManager.buildPerNodeDataPath(this),
        GossipManager.buildSharedDataPath(this));
    //gossip成员状态刷新器
    this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData);
    //加载节点gossip成员
    readSavedRingState();
    //加载节点及共享数据
    readSavedDataState();
  }
...
}

gossip管理器,对集群的成员、消息处理、全局配置、节点状态持久器、用户数据持久器,协议管理器,udp传输管理器进行集成。我们来 简单看一下共享数据持久器;

public class UserDataPersister implements Runnable {
  
  private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class);
  private final GossipCore gossipCore;

  /**
   * 节点数据文件
   */
  private final File perNodePath;
  /**
   * 节点共享数据文件
   */
  private final File sharedPath;
  private final ObjectMapper objectMapper;
  
  UserDataPersister(GossipCore gossipCore, File perNodePath, File sharedPath) {
    this.gossipCore = gossipCore;
    this.objectMapper = GossipManager.metdataObjectMapper;
    this.perNodePath = perNodePath;
    this.sharedPath = sharedPath;
  }

  /**
   * 从磁盘加载节点数据
   * @return
   */
  @SuppressWarnings("unchecked")
  ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> readPerNodeFromDisk(){
    if (!perNodePath.exists()) {
      return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>();
    }
    try (FileInputStream fos = new FileInputStream(perNodePath)){
      return objectMapper.readValue(fos, ConcurrentHashMap.class);
    } catch (IOException e) {
      LOGGER.debug(e);
    }
    return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>();
  }

  /**
   *
   */
  void writePerNodeToDisk(){
    try (FileOutputStream fos = new FileOutputStream(perNodePath)){
      objectMapper.writeValue(fos, gossipCore.getPerNodeData());
    } catch (IOException e) {
      LOGGER.warn(e);
    }
  }
  
  void writeSharedToDisk(){
    try (FileOutputStream fos = new FileOutputStream(sharedPath)){
      objectMapper.writeValue(fos, gossipCore.getSharedData());
    } catch (IOException e) {
      LOGGER.warn(e);
    }
  }

  /**
   * 从磁盘加载共享数据
   * @return
   */
  @SuppressWarnings("unchecked")
  ConcurrentHashMap<String, SharedDataMessage> readSharedDataFromDisk(){
    if (!sharedPath.exists()) {
      return new ConcurrentHashMap<>();
    }
    try (FileInputStream fos = new FileInputStream(sharedPath)){
      return objectMapper.readValue(fos, ConcurrentHashMap.class);
    } catch (IOException e) {
      LOGGER.debug(e);
    }
    return new ConcurrentHashMap<String, SharedDataMessage>();
  }
  
  /**
   * Writes all pernode and shared data to disk 
   */
  @Override
  public void run() {
    writePerNodeToDisk();
    writeSharedToDisk();
  }
}

用户数据持久UserDataPersister,主要是在节点宕机将用户数据持久化的磁盘,启动时,在从磁盘加载。节点状态持久化器原理相似。

总结

gossip是基于UDP协议进行通信的,底层是通过UDP管理器进行消息的收发;消息编解码器使用的是JSON序列化方式,节点接收消息后,有解码器进行解码,委托消息处理器处理,消息处理器有节点数据、共享数据、及宕机消息等。集群中的节点的状态同步通过节点状态刷新器进行同步。针对族活跃节点、宕机节点、节点数据,及共享数据的同步通过数据同步器完成。core组件对节点、共享数据进行管理,及消息的收发。gossip管理器持久节点状态、数据及共享数据。

UDP传输管理UdpTransportManager,主要发送字节数据,接收字节数据,并委托给core组件。在UDP传输管理器启动的过程中,同时启动节点状态刷新器。

消息编解码器JacksonProtocolManager是基于JSON的消息编解码,如果是RSA加签方式,则在收发消息,做响应的加解密。

共享数据和节点数据处理实际是委托给core组件,宕机消息转换为节点消息进行处理。

节点状态刷新器GossipMemberStateRefresher,通过宕机探测器FailureDetector,探测节点的存活状态,并更新节点状态,针对状态变更的,要通知响应的监听器。

数据同步器,同步活跃、宕机成员,节点数据、共享数据。发送活跃节点消息,实际随机从活跃节点的信息,将当前节点的活跃节点信息包装保活gossip消息发送给筛选出的节点。发送宕机节点的原理一致。发送消息委托给core组件。发送共享数据实际为通过core组件将共享数据保证成UdpSharedDataMessage并发送给peer节点。节点数据原理一致,只不过发送的是节点数据。

peer节点接收消息后,调用消息处理器,处理消息,针对共享数据,如果基于CRDT的数据则合并,否则更新,通知事件处理器;针对节点数据基本相同。core组件发送消息实际是委托底层的UDP传输管理器进行传输。

gossip管理器,对集群的成员、消息处理、全局配置、节点状态持久器、节点数据持久器、共享数据持久器,协议管理器,udp传输管理器进行集成。

Gossip

P2P 网络核心技术:Gossip 协议
漫谈 Gossip 协议
浅谈集群版Redis和Gossip协议

incubator-retired-gossip
apache gossip

分布式一致性算法-CRDT一
分布式一致性算法-CRDT二
分布式CRDT模型
如何选择地理分布式双活策略:合并复制和CRDT之对比
何时使用基于CRDT的数据库
基于TLA+的CRDT协议验证框架

单实例比redis吞吐大12倍,Anna用了什么黑科技?

Kademlia

P2P 网络核心技术:Kademlia 协议
Kademlia协议
Kademlia
OpenKAD github
openkad google