我可以设置部分消息传输吗?目前文件是作为单个消息传输的。
可以。以下是我Spring Boot实验项目中的相关配置 - 基本上
UploadWSHandler
已注册,同时设置了
WebSocketTransportRegistration.setMessageSizeLimit
。
@Configuration
@EnableWebSocket
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer implements WebSocketConfigurer {
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new UploadWSHandler(), "/binary");
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(50 * 1024 * 1024);
}
}
上传WShandler如下所示。抱歉,这里有太多的代码-重点如下:
-
supportsPartialMessage
返回true。
-
handleBinaryMessage
将被多次调用以获取部分消息,因此我们需要组装字节。因此,在建立连接后,
afterConnectionEstablished
使用websocket URL查询建立身份识别。但您不必使用此机制。我选择此机制的原因是为了保持客户端简单,这样我只需要调用
webSocket.send(files[0])
一次,即不在javascript端对文件blob对象进行分片。(旁边的一点:我想在客户端上使用普通websocket-没有stomp / socks)
- 内部客户端分块机制提供
message.isLast()
最后一条消息。
- 仅用于演示目的,我正在将其写入文件系统,并在
FileUploadInFlight
中累积这些字节,但您不必这样做,可以随时流式传输到其他地方。
public class UploadWSHandler extends BinaryWebSocketHandler {
Map<WebSocketSession, FileUploadInFlight> sessionToFileMap = new WeakHashMap<>();
@Override
public boolean supportsPartialMessages() {
return true;
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
ByteBuffer payload = message.getPayload();
FileUploadInFlight inflightUpload = sessionToFileMap.get(session);
if (inflightUpload == null) {
throw new IllegalStateException("This is not expected");
}
inflightUpload.append(payload);
if (message.isLast()) {
Path basePath = Paths.get(".", "uploads", UUID.randomUUID().toString());
Files.createDirectories(basePath);
FileChannel channel = new FileOutputStream(
Paths.get(basePath.toString() ,inflightUpload.name).toFile(), false).getChannel();
channel.write(ByteBuffer.wrap(inflightUpload.bos.toByteArray()));
channel.close();
session.sendMessage(new TextMessage("UPLOAD "+inflightUpload.name));
session.close();
sessionToFileMap.remove(session);
}
String response = "Upload Chunk: size "+ payload.array().length;
System.out.println(response);
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessionToFileMap.put(session, new FileUploadInFlight(session));
}
static class FileUploadInFlight {
String name;
String uniqueUploadId;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
FileUploadInFlight(WebSocketSession session) {
String query = session.getUri().getQuery();
String uploadSessionIdBase64 = query.split("=")[1];
String uploadSessionId = new String(Base64Utils.decodeUrlSafe(uploadSessionIdBase64.getBytes()));
System.out.println(uploadSessionId);
List<String> sessionIdentifiers = Splitter.on("\\").splitToList(uploadSessionId);
String uniqueUploadId = session.getRemoteAddress().toString()+sessionIdentifiers.get(0);
String fileName = sessionIdentifiers.get(1);
this.name = fileName;
this.uniqueUploadId = uniqueUploadId;
}
public void append(ByteBuffer byteBuffer) throws IOException{
bos.write(byteBuffer.array());
}
}
}
顺便提一下,一个正在运行的项目也是在with-websocked-chunking-assembly-and-fetch
分支中的spring-boot-with-websocked-chunking-assembly-and-fetch