根据您的发送方式,以多种方式发送数据

8
我有一组键和值,我想把它们打包成一个字节数组发送到我们的消息队列中。我将创建一个包含所有键和值的字节数组,其大小应始终小于50K,然后发送到我们的消息队列中。 Packet类:
public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private int pendingItems = 0;

  public Packet(final RecordPartition recordPartition) {
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
        .put(replicated);
  }

  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

以下是我发送数据的方式。目前,我的设计只允许通过调用上面的sendData()方法中的sendToQueueAsync方法异步发送数据。
  private void validateAndSend(final RecordPartition partition) {
    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

    final Packet packet = new Packet(partition);

    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll()) != null) {
      packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
          dataHolder.getProcessBytes());
    }
    packet.close();
  }

现在我需要扩展我的设计,以便我可以通过三种不同的方式发送数据。用户可以决定采用“同步”或“异步”的方式发送数据。
  • 我需要通过调用sender.sendToQueueAsync方法来异步发送数据。
  • 或者我需要通过调用sender.sendToQueueSync方法来同步发送数据。
  • 或者我需要通过调用sender.sendToQueueSync方法在特定的套接字上同步发送数据。在这种情况下,我需要以某种方式传递socket变量,以便sendData知道这个变量。

SendRecord类:

public class SendRecord {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
  private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
      .concurrencyLevel(100).build();

  private static class Holder {
    private static final SendRecord INSTANCE = new SendRecord();
  }

  public static SendRecord getInstance() {
    return Holder.INSTANCE;
  }

  private SendRecord() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        handleRetry();
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  private void handleRetry() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage message : messages) {
      if (message.hasExpired()) {
        if (message.shouldRetry()) {
          message.markResent();
          doSendAsync(message);
        } else {
          cache.invalidate(message.getAddress());
        }
      }
    }
  }

  // called by multiple threads concurrently
  public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m);
  }

  // called by above method and also by handleRetry method
  private boolean doSendAsync(final PendingMessage pendingMessage) {
    Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(liveSocket.get().getSocket());
    } finally {
      msg.destroy();
    }
  }

  // called by send method below
  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(socket);
    } finally {
      msg.destroy();
    }
  }

  // called by multiple threads to send data synchronously without passing socket
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  // called by a threads to send data synchronously but with socket as the parameter
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  public void handleAckReceived(final long address) {
    PendingMessage record = cache.getIfPresent(address);
    if (record != null) {
      record.ackReceived();
      cache.invalidate(address);
    }
  }
}

调用者只会调用以下三种方法之一:

  • 通过传递两个参数来调用sendToQueueAsync
  • 通过传递两个参数来调用sendToQueueSync
  • 通过传递三个参数来调用sendToQueueSync

我该如何设计我的Packet和SendRecord类,以便可以告诉Packet类,这些数据需要以上述三种方式之一发送到我的消息队列。 用户可以决定以哪种方式将数据发送到消息队列。 我的Packet类目前只能以一种方式发送数据。

6个回答

4
我认为你最好的选择是策略模式 (https://en.wikipedia.org/wiki/Strategy_pattern)。
使用这种模式,您可以封装每种“发送”类型的行为,例如异步发送类、同步发送类和异步套接字发送类。 (您可能可以想出更好的名称)。然后,Packet 类可以根据一些逻辑决定使用哪个类将数据发送到队列中,而不影响其他部分。

发送同样的数据需要三个不同的类?你认为这太多了吗?或者我想错了。 - user1950349
@user1950349 是的,我不认为有什么问题。这可以使类具有单一职责。小类始终是更可取的。 - Kerri Brown
哦,但是我应该在Packet类中添加什么逻辑来决定调用哪个代码来发送数据呢? - user1950349
@user1950349 嗯,我不知道你需要什么样的逻辑,但我假设无论你选择使用哪种设计,你都需要相同的逻辑。 - Kerri Brown

2
使用枚举变量来定义发送消息的类型。
public enum TypeToSend {
    async, sync, socket 
}

public final class Packet implements Closeable {
TypeToSend typeToSend;
public Packet(TypeToSend typeToSend) {
        this.typeToSend = typeToSend;
    }
switch(typeToSend){
     case async:{}
     case sync:{}
     case socket:{}
}
}

2

首先,您需要清楚地回答一个问题:是谁(或您代码的哪个部分)负责决定使用哪种发送方法。

  • 它基于某些外部配置吗?
  • 它基于某些(动态)用户决策吗?
  • 它基于正在处理的分区吗?
  • 它基于消息内容吗?

(仅举几种可能性)

答案将确定最合适的结构。

尽管如此,很明显当前的sendData()方法是实现该决策的地方。因此,需要为该方法提供实现以使用。实际上,在所有情况下,send()可能都是相似的。建议将发送功能封装到一个接口中,该接口提供send()方法签名:

send(address, data);

如果目标套接字需要从实际消息数据中确定,则您可能更喜欢一个通用签名。
send(address, data, socket);

可以使套接字值为可选的,或者对于“没有特定套接字”的情况使用特定值进行编码。否则,您可以使用通过构造函数传递套接字的特定 Sender 实例。

根据您提供的信息,我目前看不到将三种不同的发送方法实现为一个类中的三个不同方法的有效理由。如果公共代码是原因,那么使用一个公共基类将允许适当地共享。

这就留下了问题,如何在 sendData()中提供适当的 Sender 实现的特定实例。

如果发送策略要在 sendData()之外确定,则必须交付实现。可以作为参数或从当前类实例的字段。如果本地数据是确定发送策略的原因,则应将确定正确实现的责任委托给一个选择类,该类将返回正确的实现。调用将类似于:

startegySelector.selectStartegy(selectionParameters).send(address,data);

尽管如此,在执行过程中没有更清晰的图片,很难建议最佳方法。
如果决策基于数据,则整个选择和分流过程仅限于Packet类。
如果决策是在Packet之外做出的,则您可能希望在该位置获取发送策略实现,并将其作为参数传递给addAndSendJunked()(或更准确地传递给sendData())。

感谢您的建议。它始终基于用户决策,这意味着用户可以决定如何发送数据。我需要一种方式,可以通过多种方式发送相同的数据,以便用户可以选择他们想要发送的方式。目前,在sendData方法中,我通过调用此方法sendToQueueAsync将所有内容都异步发送,但通常情况下,我需要一种通过三种不同方式发送相同数据的方法。我的发送器类是“线程安全单例工厂”,具有向消息队列发送数据的所有三种方法。 - user1950349

2
你可以创建一个枚举类,比如PacketTransportionMode,它会针对不同的枚举值(如SYNC、ASYNC和SYNC_ON_SOCKET)覆盖“send”方法,示例:
public enum PacketTransportionMode {
SYNC {
    @Override
    public boolean send(Packet packet) {
        byte[] message = packet.getMessage();
        Socket socket = new Socket(packet.getReceiverHost(), packet.getReceiverPort());
        DataOutputStream dOut = new DataOutputStream(socket.getOutputStream());
        dOut.writeInt(message.length); // write length of the message
        dOut.write(message);           // write the message
        return true;
    }
},
ASYNC {
    @Override
    public boolean send(Packet packet) {
        // TODO Auto-generated method stub
        return false;
    }
},
SYNC_ON_SOCKET

{
    @Override
    public boolean send(Packet packet) {
        // TODO Auto-generated method stub
        return false;
    }

};
public abstract boolean send(Packet packet);
}

同时,在数据包类中引入transportationMode变量。在packet.send()实现中,可以调用this.packetTransportationMode.send(this)

客户端可以创建数据包对象并在开始时设置其transportationMode,类似于设置RecordPartition。然后客户端可以调用packet.send();

或者,将transportationMode变量放置在数据包类中,并调用this.packetTransportationMode.send(this)的方式不同,客户端也可以创建Packet对象并直接调用PacketTransportionMode.SYNC.send(packet)。


有趣的想法。你觉得能否根据我的代码提供一个示例,以便我更好地理解?目前,我有点困惑这将如何工作。 - user1950349

2
我在Packet中没有看到sender的定义。我认为它被定义为一个私有实例变量?
这个设计确实需要修改。通过让Packet类去发送数据,该设计违反了单一职责原则。应该有一个单独的(可能是抽象的)类来准备要发送的数据(准备一个java.nio.Buffer实例),它可以有一个或多个子类,其中一个返回一个java.nio.ByteBuffer实例。
再然后,需要一个单独的类来获取一个Buffer并执行发送操作。这个(可能是抽象的)类可以有不同的发送平台和方法的子类。
然后,您需要另一个类来实现生成器模式。希望发送数据包的客户端使用生成器指定具体的PacketSender(以及可能需要的其他属性,如套接字号),然后调用send()进行发送操作。

是的,我省略了 sender 的东西。一般来说它可以是任何东西。一个具有三种不同方法发送数据的类。在我的情况下,我有一个单例工厂类,我只调用了 Sender.getInstance().sendToQueueAsync 方法,但我需要扩展它,以便我可以调用任何我想要的东西?你能提供一个例子让我更好地理解吗?目前还有点困惑。 - user1950349

1

策略。与Kerri Brown的答案不同之处在于Packet不应在策略之间做出决定。相反,应在Packet类之外进行决策。

应该由3个不同的类实现单个发送策略接口,每个类对应一种提到的发送方法。将策略接口注入Packet,这样Packet无论处理哪种策略都不必更改。

您说它必须基于用户的选择。因此,可以事先询问用户选择是什么,然后根据选择来实例化发送策略接口的实现,该实现对应于用户的选择。然后,使用所选的发送策略实例化Packet。

如果您觉得以后选择可能不取决于用户,则将其制作为工厂。因此,您的解决方案变为工厂和策略的组合。

在这种情况下,Packet可以注入Factory接口。Packet要求工厂提供发送策略。然后,使用从工厂获得的策略发送。工厂要求用户输入,稍后可以通过以其他条件而不是用户输入为基础进行选择来替换该输入。通过在未来以不同方式实现工厂接口并注入该新工厂(即基于用户输入的工厂与基于某些其他条件的工厂)来实现这一点。

这两种方法都符合开闭原则,但是如果你不需要工厂模式,尽量不要过度设计。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接