public final class Packet implements Closeable {
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private int pendingItems = 0;
public Packet(final RecordPartition recordPartition) {
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
private void addHeader(final ByteBuffer buffer, final int items) {
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
.put(replicated);
}
private void sendData() {
if (itemBuffer.position() == 0) {
// no data to be sent
return;
}
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
// SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
// SendRecord.getInstance().sendToQueueSync(address, buffer.array());
// SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
itemBuffer.clear();
pendingItems = 0;
}
public void addAndSendJunked(final byte[] key, final byte[] data) {
if (key.length > 255) {
return;
}
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
sendData();
}
if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
throw new AppConfigurationException("Size of single item exceeds maximum size");
}
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
}
@Override
public void close() {
if (pendingItems > 0) {
sendData();
}
}
}
以下是我发送数据的方式。目前,我的设计只允许通过调用上面的
sendData()
方法中的sendToQueueAsync
方法异步发送数据。 private void validateAndSend(final RecordPartition partition) {
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
final Packet packet = new Packet(partition);
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
}
packet.close();
}
现在我需要扩展我的设计,以便我可以通过三种不同的方式发送数据。用户可以决定采用“同步”或“异步”的方式发送数据。
- 我需要通过调用
sender.sendToQueueAsync
方法来异步发送数据。 - 或者我需要通过调用
sender.sendToQueueSync
方法来同步发送数据。 - 或者我需要通过调用
sender.sendToQueueSync
方法在特定的套接字上同步发送数据。在这种情况下,我需要以某种方式传递socket
变量,以便sendData
知道这个变量。
SendRecord类:
public class SendRecord {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
private static class Holder {
private static final SendRecord INSTANCE = new SendRecord();
}
public static SendRecord getInstance() {
return Holder.INSTANCE;
}
private SendRecord() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetry();
}
}, 0, 1, TimeUnit.SECONDS);
}
private void handleRetry() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages) {
if (message.hasExpired()) {
if (message.shouldRetry()) {
message.markResent();
doSendAsync(message);
} else {
cache.invalidate(message.getAddress());
}
}
}
}
// called by multiple threads concurrently
public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m);
}
// called by above method and also by handleRetry method
private boolean doSendAsync(final PendingMessage pendingMessage) {
Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// this returns instantly
return msg.send(liveSocket.get().getSocket());
} finally {
msg.destroy();
}
}
// called by send method below
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// this returns instantly
return msg.send(socket);
} finally {
msg.destroy();
}
}
// called by multiple threads to send data synchronously without passing socket
public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
// called by a threads to send data synchronously but with socket as the parameter
public boolean sendToQueueSync(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
public void handleAckReceived(final long address) {
PendingMessage record = cache.getIfPresent(address);
if (record != null) {
record.ackReceived();
cache.invalidate(address);
}
}
}
调用者只会调用以下三种方法之一:
- 通过传递两个参数来调用sendToQueueAsync
- 通过传递两个参数来调用sendToQueueSync
- 通过传递三个参数来调用sendToQueueSync
我该如何设计我的Packet和SendRecord类,以便可以告诉Packet类,这些数据需要以上述三种方式之一发送到我的消息队列。 用户可以决定以哪种方式将数据发送到消息队列。 我的Packet类目前只能以一种方式发送数据。
Packet
类中添加什么逻辑来决定调用哪个代码来发送数据呢? - user1950349