我需要建立一个有状态的Service Fabric应用程序,它能够监听TCP请求并将消息传递到可靠的队列中。
目前在HTTP和WCF端点方面有很多示例,但是对于简单的TCP协议,我找不到任何示例。
在我的 ServiceManifest.xml
文件中,我有以下内容:
<Endpoints>
<!-- This endpoint is used by the communication listener to obtain the port on which to
listen. Please note that if your service is partitioned, this port is shared with
replicas of different partitions that are placed in your code. -->
<Endpoint Name="ServiceEndpoint" />
<!-- This endpoint is used by the replicator for replicating the state of your service.
This endpoint is configured through a ReplicatorSettings config section in the Settings.xml
file under the ConfigPackage. -->
<Endpoint Name="ReplicatorEndpoint" />
<Endpoint Name="tcpEndpoint" Protocol="tcp" Port="10100"/>
</Endpoints>
我有一个实现了ICommunicationListener
接口的监听器,名为TcpCommunicationListener
public class TcpCommunicationListener : ICommunicationListener
{
private readonly ServiceEventSource eventSource;
private readonly ServiceContext serviceContext;
private readonly string endpointName;
private string listeningAddress;
private string hostAddress;
public TcpCommunicationListener(ServiceContext serviceContext, ServiceEventSource eventSource, string endpointName)
{
if (serviceContext == null)
{
throw new ArgumentNullException(nameof(serviceContext));
}
if (endpointName == null)
{
throw new ArgumentNullException(nameof(endpointName));
}
if (eventSource == null)
{
throw new ArgumentNullException(nameof(eventSource));
}
this.serviceContext = serviceContext;
this.endpointName = endpointName;
this.eventSource = eventSource;
}
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
var serviceEndpoint = this.serviceContext.CodePackageActivationContext.GetEndpoint(this.endpointName);
var protocol = serviceEndpoint.Protocol;
int port = serviceEndpoint.Port;
//StatefulServiceContext statefulServiceContext = this.serviceContext as StatefulServiceContext;
this.hostAddress = FabricRuntime.GetNodeContext().IPAddressOrFQDN;
this.listeningAddress = string.Format(
CultureInfo.InvariantCulture,
"{0}://{1}:{2}",
protocol,
hostAddress,
port
);
try
{
this.eventSource.Message("Starting tcp listener " + this.listeningAddress);
return Task.FromResult(this.hostAddress);
}
catch (Exception ex)
{
this.eventSource.Message("Tcp Listener failed to open endpoint {0}. {1}", this.endpointName, ex.ToString());
throw;
}
}
public Task CloseAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public void Abort()
{
throw new NotImplementedException();
}
}
我还有一个名为
ListenerService
的StatefulService
。internal sealed class ListenerService : StatefulService
{
public ListenerService(StatefulServiceContext context)
: base(context)
{
}
protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
{
var endpoints = Context.CodePackageActivationContext.GetEndpoints()
.Where(endpoint => endpoint.Protocol == EndpointProtocol.Tcp)
.Select(endpoint => endpoint.Name);
return endpoints.Select(endpoint => new ServiceReplicaListener(
serviceContext => new TcpCommunicationListener(serviceContext, ServiceEventSource.Current, endpoint), endpoint));
}
protected override async Task RunAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
//do i spin up a TcpListener here?
}
}
我的问题是如何获取接收到的消息?
我需要在`ListenerService`的`RunAsync`方法中创建一个`TcpListener`吗?如果是这样的话,那么在`ServiceManifest.xml`中指定端点有什么意义呢?
还是说我需要在`TcpCommunicationListener`的`OpenAsync`方法中做一些事情?