WCF双工回调示例失败

10
为了锻炼一些示例服务,以供内部场景参考,我创建了这个WCF双工通道示例,汇集了多年来发现的几个示例。
双工部分不起作用,我希望我们能一起解决它。我不喜欢发布这么多代码,但我感觉我已经缩短了WCF所需的长度,并融合了我希望社区审核的所有部分。这里可能有一些非常糟糕的想法,我并不是说它是正确的,只是我目前所拥有的。
有三个部分:通道、服务器和客户端。三个项目,在这里是三个代码文件。没有XML配置,一切都是编码的。接下来是代码输出。
Channel.proj / Channel.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;

namespace Channel
{
    public interface IDuplexSyncCallback
    {
        [OperationContract]
        string CallbackSync(string message, DateTimeOffset timestamp);
    }

    [ServiceContract(CallbackContract = typeof(IDuplexSyncCallback))]
    public interface IDuplexSyncContract
    {
        [OperationContract]
        void Ping();

        [OperationContract]
        void Enroll();

        [OperationContract]
        void Unenroll();
    }
}

Server.proj / Server.cs,引用 Channel,System.Runtime.Serialization 和 System.ServiceModel。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using System.Timers;
using Channel;
using System.Diagnostics;
using System.Net.Security;

namespace Server
{
    class Program
    {
        // All of this just starts up the service with these hardcoded configurations
        static void Main(string[] args)
        {
            ServiceImplementation implementation = new ServiceImplementation();
            ServiceHost service = new ServiceHost(implementation);

            NetTcpBinding binding = new NetTcpBinding(SecurityMode.Transport);
            binding.Security.Message.ClientCredentialType = MessageCredentialType.Windows;
            binding.Security.Mode = SecurityMode.Transport;
            binding.Security.Transport.ClientCredentialType = TcpClientCredentialType.Windows;
            binding.Security.Transport.ProtectionLevel = ProtectionLevel.EncryptAndSign;
            binding.ListenBacklog = 1000;
            binding.MaxConnections = 30;
            binding.MaxReceivedMessageSize = 2147483647;
            binding.ReaderQuotas.MaxStringContentLength = 2147483647;
            binding.ReaderQuotas.MaxArrayLength = 2147483647;
            binding.SendTimeout = TimeSpan.FromSeconds(2);
            binding.ReceiveTimeout = TimeSpan.FromSeconds(10 * 60); // 10 minutes is the default if not specified
            binding.ReliableSession.Enabled = true;
            binding.ReliableSession.Ordered = true;

            service.AddServiceEndpoint(typeof(IDuplexSyncContract), binding, new Uri("net.tcp://localhost:3828"));

            service.Open();

            Console.WriteLine("Server Running ... Press any key to quit");
            Console.ReadKey(true);

            service.Abort();
            service.Close();
            implementation = null;
            service = null;
        }
    }

    /// <summary>
    /// ServiceImplementation of IDuplexSyncContract
    /// </summary>
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single,
        MaxItemsInObjectGraph = 2147483647,
        IncludeExceptionDetailInFaults = true,
        ConcurrencyMode = ConcurrencyMode.Multiple,
        UseSynchronizationContext = false)]
    class ServiceImplementation : IDuplexSyncContract
    {
        Timer announcementTimer = new Timer(5000); // Every 5 seconds
        int messageNumber = 0; // message number incrementer - not threadsafe, just for debugging.

        public ServiceImplementation()
        {
            announcementTimer.Elapsed += new ElapsedEventHandler(announcementTimer_Elapsed);
            announcementTimer.AutoReset = true;
            announcementTimer.Enabled = true;
        }

        void announcementTimer_Elapsed(object sender, ElapsedEventArgs e)
        {
            AnnounceSync(string.Format("HELLO? (#{0})", messageNumber++));
        }

        #region IDuplexSyncContract Members
        List<IDuplexSyncCallback> syncCallbacks = new List<IDuplexSyncCallback>();

        /// <summary>
        /// Simple Ping liveness
        /// </summary>
        [OperationBehavior]
        public void Ping() { return; }

        /// <summary>
        /// Add channel to subscribers
        /// </summary>
        [OperationBehavior]
        void IDuplexSyncContract.Enroll()
        {
            IDuplexSyncCallback current = System.ServiceModel.OperationContext.Current.GetCallbackChannel<IDuplexSyncCallback>();

            lock (syncCallbacks)
            {
                syncCallbacks.Add(current);

                Trace.WriteLine("Enrollment Complete");
            }
        }

        /// <summary>
        /// Remove channel from subscribers
        /// </summary>
        [OperationBehavior]
        void IDuplexSyncContract.Unenroll()
        {
            IDuplexSyncCallback current = System.ServiceModel.OperationContext.Current.GetCallbackChannel<IDuplexSyncCallback>();

            lock (syncCallbacks)
            {
                syncCallbacks.Remove(current);

                Trace.WriteLine("Unenrollment Complete");
            }
        }

        /// <summary>
        /// Callback to clients over enrolled channels
        /// </summary>
        /// <param name="message"></param>
        void AnnounceSync(string message)
        {
            var now = DateTimeOffset.Now;

            if (message.Length > 2000) message = message.Substring(0, 2000 - "[TRUNCATED]".Length) + "[TRUNCATED]";
            Trace.WriteLine(string.Format("{0}: {1}", now.ToString("mm:ss.fff"), message));

            lock (syncCallbacks)
            {
                foreach (var callback in syncCallbacks.ToArray())
                {
                    Console.WriteLine("Sending \"{0}\" synchronously ...", message);

                    CommunicationState state = ((ICommunicationObject)callback).State;

                    switch (state)
                    {
                        case CommunicationState.Opened:
                            try
                            {
                                Console.WriteLine("Client said '{0}'", callback.CallbackSync(message, now));
                            }
                            catch (Exception ex)
                            {
                                // Timeout Error happens here
                                syncCallbacks.Remove(callback);
                                Console.WriteLine("Removed client");
                            }
                            break;
                        case CommunicationState.Created:
                        case CommunicationState.Opening:
                            break;
                        case CommunicationState.Faulted:
                        case CommunicationState.Closed:
                        case CommunicationState.Closing:
                        default:
                            syncCallbacks.Remove(callback);
                            Console.WriteLine("Removed client");
                            break;
                    }
                }
            }
        }
        #endregion
    }
}

Client.proj / Client.cs,引用了Channel,System.Runtime.Serialization和System.ServiceModel。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using System.Timers;
using System.Diagnostics;
using Channel;
using System.Net;

namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var callbackSyncProxy = new CallbackSyncProxy(new Uri("net.tcp://localhost:3828"), CredentialCache.DefaultNetworkCredentials))
            {
                callbackSyncProxy.Faulted += (s, e) => Console.WriteLine("CallbackSyncProxy Faulted.");
                callbackSyncProxy.ConnectionUnavailable += (s, e) => Console.WriteLine("CallbackSyncProxy ConnectionUnavailable.");
                callbackSyncProxy.ConnectionRecovered += (s, e) => Console.WriteLine("CallbackSyncProxy ConnectionRecovered.");

                callbackSyncProxy.Ping();
                callbackSyncProxy.Ping();
                callbackSyncProxy.Ping();

                Console.WriteLine("Pings completed.  Enrolling ...");

                callbackSyncProxy.AnnouncementSyncHandler = AnnouncementHandler;

                Console.WriteLine("Enrolled and waiting.  Press any key to quit ...");
                Console.ReadKey(true); // Wait for quit
            }
        }

        /// <summary>
        /// Called by the server through DuplexChannel
        /// </summary>
        /// <param name="message"></param>
        /// <param name="timeStamp"></param>
        /// <returns></returns>
        static string AnnouncementHandler(string message, DateTimeOffset timeStamp)
        {
            Console.WriteLine("{0}: {1}", timeStamp, message);

            return string.Format("Dear Server, thanks for that message at {0}.", timeStamp);
        }
    }

    /// <summary>
    /// Encapsulates the client-side WCF setup logic.
    /// 
    /// There are 3 events Faulted, ConnectionUnavailable, ConnectionRecovered that might be of interest to the consumer
    /// Enroll and Unenroll of the ServiceContract are called when setting an AnnouncementSyncHandler
    /// Ping, when set correctly against the server's send/receive timeouts, will keep the connection alive
    /// </summary>
    public class CallbackSyncProxy : IDisposable
    {
        Uri listen;
        NetworkCredential credentials;
        NetTcpBinding binding;
        EndpointAddress serverEndpoint;
        ChannelFactory<IDuplexSyncContract> channelFactory;
        DisposableChannel<IDuplexSyncContract> channel;

        readonly DuplexSyncCallback callback = new DuplexSyncCallback();

        object sync = new object();
        bool enrolled;
        Timer pingTimer = new Timer();
        bool quit = false; // set during dispose

        // Events of interest to consumer
        public event EventHandler Faulted;
        public event EventHandler ConnectionUnavailable;
        public event EventHandler ConnectionRecovered;

        // AnnouncementSyncHandler property.  When set to non-null delegate, Enrolls client with server.
        // passes through to the DuplexSyncCallback callback.AnnouncementSyncHandler
        public Func<string, DateTimeOffset, string> AnnouncementSyncHandler
        {
            get
            {
                Func<string, DateTimeOffset, string> temp = null;

                lock (sync)
                {
                    temp = callback.AnnouncementSyncHandler;
                }
                return temp;
            }
            set
            {
                lock (sync)
                {
                    if (callback.AnnouncementSyncHandler == null && value != null)
                    {
                        callback.AnnouncementSyncHandler = value;

                        Enroll();
                    }
                    else if (callback.AnnouncementSyncHandler != null && value == null)
                    {
                        Unenroll();

                        callback.AnnouncementSyncHandler = null;
                    }
                    else // null to null or function to function, just update it
                    {
                        callback.AnnouncementSyncHandler = value;
                    }
                }
            }
        }

        /// <summary>
        /// using (var proxy = new CallbackSyncProxy(listen, CredentialCache.DefaultNetworkCredentials) { ... }
        /// </summary>
        public CallbackSyncProxy(Uri listen, NetworkCredential credentials)
        {
            this.listen = listen;
            this.credentials = credentials;

            binding = new NetTcpBinding(SecurityMode.Transport);
            binding.Security.Message.ClientCredentialType = MessageCredentialType.Windows;
            binding.Security.Mode = SecurityMode.Transport;
            binding.Security.Transport.ClientCredentialType = TcpClientCredentialType.Windows;
            binding.MaxReceivedMessageSize = 2147483647;
            binding.ReaderQuotas.MaxArrayLength = 2147483647;
            binding.ReaderQuotas.MaxBytesPerRead = 2147483647;
            binding.ReaderQuotas.MaxDepth = 2147483647;
            binding.ReaderQuotas.MaxStringContentLength = 2147483647;
            binding.ReliableSession.Enabled = true;
            binding.ReliableSession.Ordered = true;
            serverEndpoint = new EndpointAddress(listen);

            pingTimer.AutoReset = true;
            pingTimer.Elapsed += pingTimer_Elapsed;
            pingTimer.Interval = 20000;
        }

        /// <summary>
        /// Keep the connection alive by pinging at some set minimum interval
        /// </summary>
        void pingTimer_Elapsed(object sender, ElapsedEventArgs e)
        {
            bool locked = false;

            try
            {
                locked = System.Threading.Monitor.TryEnter(sync, 100);
                if (!locked)
                {
                    Console.WriteLine("Unable to ping because synchronization lock could not be aquired in a timely fashion");
                    return;
                }
                Debug.Assert(channel != null, "CallbackSyncProxy.channel is unexpectedly null");

                try
                {
                    channel.Service.Ping();
                }
                catch
                {
                    Console.WriteLine("Unable to ping");
                }
            }
            finally
            {
                if (locked) System.Threading.Monitor.Exit(sync);
            }
        }

        /// <summary>
        /// Ping is a keep-alive, but can also be called by the consuming code
        /// </summary>
        public void Ping()
        {
            lock (sync)
            {
                if (channel != null)
                {
                    channel.Service.Ping();
                }
                else
                {
                    using (var c = new DisposableChannel<IDuplexSyncContract>(GetChannelFactory().CreateChannel()))
                    {
                        c.Service.Ping();
                    }
                }
            }
        }

        /// <summary>
        /// Enrollment - called when AnnouncementSyncHandler is assigned
        /// </summary>
        void Enroll()
        {
            lock (sync)
            {
                if (!enrolled)
                {
                    Debug.Assert(channel == null, "CallbackSyncProxy.channel is unexpectedly not null");

                    var c = new DisposableChannel<IDuplexSyncContract>(GetChannelFactory().CreateChannel());

                    ((ICommunicationObject)c.Service).Open();

                    ((ICommunicationObject)c.Service).Faulted += new EventHandler(CallbackChannel_Faulted);

                    c.Service.Enroll();

                    channel = c;

                    Debug.Assert(!pingTimer.Enabled, "CallbackSyncProxy.pingTimer unexpectedly Enabled");

                    pingTimer.Start();

                    enrolled = true;
                }
            }
        }

        /// <summary>
        /// Unenrollment - called when AnnouncementSyncHandler is set to null
        /// </summary>
        void Unenroll()
        {
            lock (sync)
            {
                if (callback.AnnouncementSyncHandler != null)
                {
                    Debug.Assert(channel != null, "CallbackSyncProxy.channel is unexpectedly null");

                    channel.Service.Unenroll();

                    Debug.Assert(!pingTimer.Enabled, "CallbackSyncProxy.pingTimer unexpectedly Disabled");

                    pingTimer.Stop();

                    enrolled = false;
                }
            }
        }

        /// <summary>
        /// Used during enrollment to establish a channel.
        /// </summary>
        /// <returns></returns>
        ChannelFactory<IDuplexSyncContract> GetChannelFactory()
        {
            lock (sync)
            {
                if (channelFactory != null &&
                    channelFactory.State != CommunicationState.Opened)
                {
                    ResetChannel();
                }

                if (channelFactory == null)
                {
                    channelFactory = new DuplexChannelFactory<IDuplexSyncContract>(callback, binding, serverEndpoint);

                    channelFactory.Credentials.Windows.ClientCredential = credentials;

                    foreach (var op in channelFactory.Endpoint.Contract.Operations)
                    {
                        var b = op.Behaviors[typeof(System.ServiceModel.Description.DataContractSerializerOperationBehavior)] as System.ServiceModel.Description.DataContractSerializerOperationBehavior;

                        if (b != null)
                            b.MaxItemsInObjectGraph = 2147483647;
                    }
                }
            }

            return channelFactory;
        }

        /// <summary>
        /// Channel Fault handler, set during Enrollment
        /// </summary>
        void CallbackChannel_Faulted(object sender, EventArgs e)
        {
            lock (sync)
            {
                if (Faulted != null)
                {
                    Faulted(this, new EventArgs());
                }

                ResetChannel();

                pingTimer.Stop();
                enrolled = false;

                if (callback.AnnouncementSyncHandler != null)
                {
                    while (!quit) // set during Dispose
                    {
                        System.Threading.Thread.Sleep(500);

                        try
                        {
                            Enroll();

                            if (ConnectionRecovered != null)
                            {
                                ConnectionRecovered(this, new EventArgs());

                                break;
                            }
                        }
                        catch
                        {
                            if (ConnectionUnavailable != null)
                            {
                                ConnectionUnavailable(this, new EventArgs());
                            }
                        }
                    }
                }
            }
        }

        /// <summary>
        /// Reset the Channel & ChannelFactory if they are faulted and during dispose
        /// </summary>
        void ResetChannel()
        {
            lock (sync)
            {
                if (channel != null)
                {
                    channel.Dispose();
                    channel = null;
                }

                if (channelFactory != null)
                {
                    if (channelFactory.State == CommunicationState.Faulted)
                        channelFactory.Abort();
                    else
                        try
                        {
                            channelFactory.Close();
                        }
                        catch
                        {
                            channelFactory.Abort();
                        }

                    channelFactory = null;
                }
            }
        }

        // Disposing of me implies disposing of disposable members
        #region IDisposable Members
        bool disposed;
        void IDisposable.Dispose()
        {
            if (!disposed)
            {
                Dispose(true);
            }

            GC.SuppressFinalize(this);
        }

        void Dispose(bool disposing)
        {
            if (disposing)
            {
                quit = true;

                ResetChannel();

                pingTimer.Stop();

                enrolled = false;

                callback.AnnouncementSyncHandler = null;
            }

            disposed = true;
        }
        #endregion
    }

    /// <summary>
    /// IDuplexSyncCallback implementation, instantiated through the CallbackSyncProxy
    /// </summary>
    [CallbackBehavior(UseSynchronizationContext = false, 
    ConcurrencyMode = ConcurrencyMode.Multiple, 
    IncludeExceptionDetailInFaults = true)]
    class DuplexSyncCallback : IDuplexSyncCallback
    {
        // Passthrough handler delegates from the CallbackSyncProxy
        #region AnnouncementSyncHandler passthrough property
        Func<string, DateTimeOffset, string> announcementSyncHandler;
        public Func<string, DateTimeOffset, string> AnnouncementSyncHandler
        {
            get
            {
                return announcementSyncHandler;
            }
            set
            {
                announcementSyncHandler = value;
            }
        }
        #endregion

        /// <summary>
        /// IDuplexSyncCallback.CallbackSync
        /// </summary>
        [OperationBehavior]
        public string CallbackSync(string message, DateTimeOffset timestamp)
        {
            if (announcementSyncHandler != null)
            {
                return announcementSyncHandler(message, timestamp);
            }
            else
            {
                return "Sorry, nobody was home";
            }
        }
    }

    // This class wraps an ICommunicationObject so that it can be either Closed or Aborted properly with a using statement
    // This was chosen over alternatives of elaborate try-catch-finally blocks in every calling method, or implementing a
    // new Channel type that overrides Disposable with similar new behavior
    sealed class DisposableChannel<T> : IDisposable
    {
        T proxy;
        bool disposed;

        public DisposableChannel(T proxy)
        {
            if (!(proxy is ICommunicationObject)) throw new ArgumentException("object of type ICommunicationObject expected", "proxy");

            this.proxy = proxy;
        }

        public T Service
        {
            get
            {
                if (disposed) throw new ObjectDisposedException("DisposableProxy");

                return proxy;
            }
        }

        public void Dispose()
        {
            if (!disposed)
            {
                Dispose(true);
            }

            GC.SuppressFinalize(this);
        }

        void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (proxy != null)
                {
                    ICommunicationObject ico = null;

                    if (proxy is ICommunicationObject)
                        ico = (ICommunicationObject)proxy;

                    // This state may change after the test and there's no known way to synchronize
                    // so that's why we just give it our best shot
                    if (ico.State == CommunicationState.Faulted)
                        ico.Abort(); // Known to be faulted
                    else
                        try
                        {
                            ico.Close(); // Attempt to close, this is the nice way and we ought to be nice
                        }
                        catch
                        {
                            ico.Abort(); // Sometimes being nice isn't an option
                        }

                    proxy = default(T);
                }
            }

            disposed = true;
        }
    }
}

整理后的输出:

>> Server Running ... Press any key to quit
                           Pings completed.  Enrolling ... <<
          Enrolled and waiting.  Press any key to quit ... <<
>> Sending "HELLO? (#0)" synchronously ...
                                CallbackSyncProxy Faulted. <<
                    CallbackSyncProxy ConnectionRecovered. <<
>> Removed client
>> Sending "HELLO? (#2)" synchronously ...
                   8/2/2010 2:47:32 PM -07:00: HELLO? (#2) <<
>> Removed client

正如Andrew所指出的那样,问题并不那么显然。这个“汇总输出”不是期望的输出。相反,我希望服务器运行,ping和注册成功,然后每5秒钟,服务器会同步地“发送”HELLO?(#m)”,然后客户端立即转换并返回,服务器接收并打印出来。
但实际上,ping可以工作,但回调在第一次尝试时失败,在重新连接到客户端后没有返回到服务器上,随后所有东西都断开了。
我只能看到有关通道以前发生故障且因此无法使用的异常,但还没有关于导致通道达到该状态的实际故障的异常。
我已经多次使用了类似的代码,并添加了[OperationalBehavior(IsOneWay=true)]。奇怪的是,这个看似更常见的情况给了我这么多麻烦。
在服务器端捕获的异常是:
System.TimeoutException:“发送到schemas.microsoft.com/2005/12/ServiceModel/Addressing/Anonymous的此请求操作未在配置的超时时间内(00:00:00)收到回复。分配给此操作的时间可能是更长时间的超时的一部分。这可能是因为服务仍在处理操作或因为服务无法发送回复消息。请考虑增加操作超时时间(通过将通道/代理转换为IContextChannel并设置OperationTimeout属性),并确保服务能够连接到客户端。”

1
你应该说清楚双工部分“不起作用”的具体情况。否则,没有人会有动力去阅读代码。 - Andrew Shepherd
好的,我已经在底部添加了注释。这样更好吗? - Jason Kleban
也许你应该把可以构建和运行的项目放在网络上的一个压缩文件中,这样理想情况下,谁有时间就可以在5分钟内进行构建、复制部署和运行,然后在调试器中观察发生了什么,并拥有VS辅助工具来深入挖掘代码。顺便说一句,StackOverfkow需要改变他们的CSS,开始使用紧凑的间距来显示代码 :-) - ZXX
5个回答

3
在AnnounceSync方法中添加FaultException处理,如果收到服务器(在您的情况下是客户端)没有响应的消息,则会通知您表示未收到回调。正如您所建议的那样,这是由于超时引起的。因此,请进行更改。
binding.SendTimeout = TimeSpan.FromSeconds(3);

它将按预期工作。

try
{
    Console.WriteLine("Client said '{0}'",callback.CallbackSync(message, now) );
}
catch (FaultException fex)
{
    syncCallbacks.Remove(callback);
    Console.WriteLine("Failed to call Client because" + fex.Reason);
    Console.WriteLine(fex.Message);
}

1
你的自信很吸引人,但我无法让它工作。我已经在server.cs中添加了这个catch块,并将超时时间从2秒更改为3秒。行为没有任何变化。抛出的异常是TimeoutException而不是FaultException。 - Jason Kleban

1

这可能不能完全解决您的问题,但是看着您的代码,IDuplexSyncCallback 绝对是一个嫌疑人。它的服务实现部分已经就位,但也应该使用 ServiceContractAttribute 进行装饰。在进行回调时,它必须被指定为单向的。以下是我过去为回调合同所做的示例,也可能对您有所帮助。

[ServiceContract]
public interface IDuplexSyncCallback
{
    [OperationContract(IsOneWay = true)
    string CallbackSync(string message, DateTimeOffset timestamp);
}

即使在larsw引用的示例中,回调接口也没有标记为[ServiceContract]。我在其他地方读到过不需要这样做,但我再也找不到那个网址了。无论如何,我试过了,但并没有什么区别。第二个问题是IsOneWay = true对于这种情况不合适——我需要一个响应!我没有读到过Callback必须使用IsOneWay = true(它们不能返回值)。 - Jason Kleban
OneWay不是必需的,这是正确的。回调通常是异步的,这就是为什么你可能会发现一些例子是这样的。很抱歉没有帮到你。 - Jeff LaFay

1

这很愚蠢/令人恼火,但似乎 ProtectionLevel.EncryptAndSign 是问题所在。我在谷歌上找到了与绑定和 Windows 认证有关的错误消息,但很少见。这让我猜测,也许由于绑定加密或其他原因,上游通信未能正常工作...之类的。但将其设置为 ProtectionLevel.None,突然间允许双向方法(将值返回给服务器的方法)的双工通道工作。

我并不是说关闭保护级别是一个好主意,但至少它是一个重要的线索。如果您需要 EncryptAndSign 的好处,可以从那里进一步调查。


0

我已经检查了这个示例,但它具体使用了 [OperationContract(IsOneWay = true)],而对于这种情况是不可接受的。 - Jason Kleban

0

很遗憾,单向操作是双工通道的先决条件。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接