Azure WebJob不能向Azure SignalR服务发送SignalR消息

6
我有一个运行在 Azure App Service 上的 Asp.Net Core Api 3.1 和一个 Azure WebJob,它们都需要发送通知并且不会接收消息。具体来说:
  • 我在云中有一个 Azure SignalR 服务的单个实例,每个应用程序都指向它。

  • 我创建的 Hub 类在库中,被 Api 和 WebJob 项目引用

  • 客户端只能通过 Api 连接到 Hub。

  • 当 Api 发送消息到连接时,一切都正常工作,但是 WebJob 没有。

真的不想将 WebJob 运行为客户端,因为这样我就必须处理验证。我只希望它作为同一个 Hub 的另一个实例运行,将消息发送给与 API 相同的组。此外,该 WebJob 不适合运行为 Azure Function,因为它太长了。

我在配置 WebJob 方面忽略了什么,因为它似乎没有连接到 Azure SignalR。

当 WebJob 尝试发送消息时,我收到以下错误:(我尚未发布到 Azure,因此所有操作都在本地机器上进行)

Microsoft.Azure.SignalR.ServiceLifetimeManager[100] Failed to send message (null). Microsoft.Azure.SignalR.Common.AzureSignalRNotConnectedException: Azure SignalR 服务尚未连接,请稍后重试。 at Microsoft.Azure.SignalR.ServiceConnectionManager1.WriteAsync(ServiceMessage serviceMessage) at Microsoft.Azure.SignalR.ServiceLifetimeManagerBase1.<>c__DisplayClass22_01.<WriteAsync>b__0(T m) at Microsoft.Azure.SignalR.ServiceLifetimeManagerBase1.WriteCoreAsync[T](T message, Func`2 task)

WebJob 主程序:

using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MyProject.Data;
using MyProject.Hub;   
using Microsoft.Azure.KeyVault;
using Microsoft.Azure.Services.AppAuthentication;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Configuration.AzureKeyVault;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

 public static IConfigurationRoot Configuration { get; protected set; }

    static async Task Main(string[] args)
    {    
        string buildConfig = "Development";           
        
        var builder = new HostBuilder()  
            .ConfigureWebJobs(b =>
            {
                b.AddAzureStorageCoreServices();
                b.AddTimers();                                     
            })
            .ConfigureAppConfiguration(config =>
            {                    
                config.AddJsonFile($"appsettings.{buildConfig}.json", true, true);  
                Configuration = config.Build();
            })                
            .ConfigureServices((hostContext, services) =>
            {                    
                services.AddSignalR().AddAzureSignalR(options => 
                {
                    options.ConnectionString = Configuration["Azure:SignalR:ConnectionString"];  
                });            
                
                services.AddHostedService<ApplicationHostService>();                   
                services.AddDbContext<MyDbContext>(options => options.UseSqlServer(Configuration.GetConnectionString("MyDatabase")));
                services.AddScoped<MyDbContext>();     

           **// The NotificationService expects an injected MyDbContext and IHubContext<NotificationHub>**
                services.AddScoped<INotificationService, NotificationService>();
            })
            .ConfigureLogging((context, b) =>
            {
                b.AddConsole();
            });

        var host = builder.Build();
        using (host)
        {
            host.Run();
        }
    }
}

ApplicationHostService:

public class ApplicationHostService : IHostedService
{
    readonly ILogger<ApplicationHostService> _logger;
    readonly IConfiguration _configuration;
    readonly IHostingEnvironment _hostingEnvironment;
    public ApplicationHostService(
        ILogger<ApplicationHostService> logger,
        IConfiguration configuration,
        IHostingEnvironment hostingEnvironment
        )
    {
        _logger = logger;
        _configuration = configuration;
        _hostingEnvironment = hostingEnvironment;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await Task.CompletedTask;
        _logger.LogWarning("Application Host Service started.....");           
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {            
        _logger.LogWarning("Application Host Service stopped.....");
        await Task.CompletedTask;
    }
}

WebJob 触发器代码:

public class MyImportTrigger
{       
    private INotificationService _notificationService;        
  
    public MyImportTrigger(INotificationService notificationService)
    {
        _notificationService = notificationService;
    }

    
    public async Task Run([TimerTrigger("0 */1 * * * *" )] TimerInfo myTimer,  ILogger log)
    {
        ....bunch of non-relevant removed code for brevity....
        
        await _notificationService.CreateFundImportNotificationAsync(upload);
           
        ....bunch of non-relevant removed code for brevity....                  
    }
}

通知服务:

using ProjectName.Auth;
using ProjectName.Data;
using ProjectName.Utils;
using Microsoft.AspNetCore.SignalR;
using SignalR.Mvc;
using System;
using System.ComponentModel;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;


namespace ProjectName.Hub
{   
    public interface INotificationService
    {        
        FundPublishNotification CreateFundPublishNotification(short quarter, short year, string userName);        
    }

    public class NotificationService : INotificationService
    {
        MyDbContext _context;
        IHubContext<NotificationHub> _hubContext;

        public NotificationService(MyDbContext context, IHubContext<NotificationHub> hubContext)
        {
            _context = context;
            _hubContext = hubContext;
        }

      
        public FundPublishNotification CreateFundPublishNotification(short quarter, short year, string userName)
        {
           *** removed: do some processing and db persistence to create an object called "notif" ***
             
            **** THIS IS WHERE IT BOMBS WHEN CALLED FROM WEBJOB, BUT DOESN'T WHEN CALLED FROM API******                                   

             ** the roles value is retrieved from the db and isn't dependent on a current user **

             _hubContext.Clients.Groups(roles).SendAsync("NewNotification", notif);           

            return notif;
        }
    }
}

Hub类:

using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;
using MyProject.Auth;


namespace SignalR.Mvc
{
    [Authorize]    
    public class NotificationHub : Hub
    {

        public NotificationHub()
        { }

        public override async Task OnConnectedAsync()
        {
            await AddConnectionToGroups();
            await base.OnConnectedAsync();
        }

       
        public async Task AddConnectionToGroups()
        {
            var roles = Context.User.Roles();
            foreach (RoleValues role in roles)
            {
                await Groups.AddToGroupAsync(Context.ConnectionId, role.ToString());
            }
        }

       
        public override async Task OnDisconnectedAsync(Exception exception)
        {
            await RemoveConnectionToGroups();
            await base.OnDisconnectedAsync(exception);
        }

        public async Task RemoveConnectionToGroups()
        {
            var roles = Context.User.Roles();
            foreach (RoleValues role in roles)
            {
                await Groups.RemoveFromGroupAsync(Context.ConnectionId, role.ToString());
            }
        }
    }
}

appsettings.json:

{
 "Azure": {
    "SignalR": {
      "ConnectionString": "Endpoint=https://myproject-signalr-dev.service.signalr.net;AccessKey=removed-value-before-posting;Version=1.0;"
    }
  }
}

包版本:

<PackageReference Include="Azure.Security.KeyVault.Keys" Version="4.1.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.6.0" />
<PackageReference Include="Microsoft.AspNetCore.SignalR" Version="1.1.0" />
<PackageReference Include="Microsoft.Azure.SignalR" Version="1.6.0" />
<PackageReference Include="Microsoft.Azure.SignalR.Management" Version="1.6.0" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.23" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Http" Version="3.0.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.SignalRService" Version="1.2.2" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="3.1.8" />
<PackageReference Include="Microsoft.Extensions.Configuration.AzureKeyVault" Version="3.1.8" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.8" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.8" />
2个回答

2

好的,我会帮忙翻译。以下是相关解决方案的部分内容。

使用可通过以下软件包获得的ServiceManagerBuilder:

using Microsoft.Azure.SignalR.Management;

运行任务:

public class ImportTrigger
    {       
        private INotificationService _notificationService;
        private IConfiguration _config;        
        private ILogger<ImportTrigger> _logger;       


        public ImportTrigger(INotificationService notificationService, IConfiguration configuration, ILoggerFactory loggerFactory) 
        {
            _notificationService = notificationService; 
            _config = configuration;           
            _logger = loggerFactory.CreateLogger<ImportTrigger>();           
        }

        
        public async Task Run([TimerTrigger("%CRONTIME%" )] TimerInfo myTimer)
        {            
            ... bunch of removed code for brevity ...

             try
            {
                var (importNotif, roles) = _notificationService.CreateFundImportNotificationAsync(upload);
                using (var hubServiceManager = new ServiceManagerBuilder().WithOptions(option =>
                                {
                                    option.ConnectionString = _config["Azure:SignalR:ConnectionString"];
                                    option.ServiceTransportType = ServiceTransportType.Persistent;
                                }).Build())
                {
                    var hubContext = await hubServiceManager.CreateHubContextAsync("NotificationHub");
                    await hubContext.Clients.Groups(roles.Select(r => r.ToString()).ToImmutableList<string>()).SendAsync("NewNotification", importNotif.ToModel());
                }
            }
            catch { }    

          ... bunch of removed code for brevity ...      
        }

0
我的设置完全相同,我也遇到了相同的行为/错误。 我也打算在WebJob中重用WebApi已经使用的业务服务。这些业务服务注入上下文,像这样:IHubContext<CustomerChatHub, ICustomerChatClient> customerChatHub。 在我的情况下,我有两个hub,所以我保留了SignalRContextProvider和HubConextAdapter的泛型,并且可以在引导代码中简单地添加所需的hub。

// ...

// Register service that injects IHubContext<CustomerChatHub, ICustomerChatClient> customerChatHub
// This service is used by the WebApi and also by the WebJob-
services.AddSingleton<IConversationService, ConversationService>();

// ...

// SignalR from WebJob Setup
services
    // Hosted services that will be initialized asynchronously on startup
    .AddSingleton<IHostedService>(sp => sp.GetRequiredService<SignalRContextProvider<ConsultantChatHub, IConsultantChatClient>>())
    .AddSingleton<IHostedService>(sp => sp.GetRequiredService<SignalRContextProvider<CustomerChatHub, ICustomerChatClient>>())
    // Hosted service implementations which will create hub contexts 
    .AddSingleton<SignalRContextProvider<ConsultantChatHub, IConsultantChatClient>>()
    .AddSingleton<SignalRContextProvider<CustomerChatHub, ICustomerChatClient>>()
    // Ensures, that we still can inject the hub contexts into our services, but use the slightly different model of ServiceHubContext
    .AddSingleton<IHubContext<ConsultantChatHub, IConsultantChatClient>, HubContextAdapter<ConsultantChatHub, IConsultantChatClient>>()
    .AddSingleton<IHubContext<CustomerChatHub, ICustomerChatClient>, HubContextAdapter<CustomerChatHub, ICustomerChatClient>>();

/// <summary>
/// Provides and initializes a hub context.
/// </summary>
/// <typeparam name="T">SingalR Hub.</typeparam>
/// <typeparam name="I">The type of client.</typeparam>
public class SignalRContextProvider<T, I> : IHostedService, IAsyncDisposable where T : class where I : class
{
    private readonly IConfiguration _configuration;
    private readonly ILoggerFactory _loggerFactory;

    public SignalRContextProvider(IConfiguration configuration, ILoggerFactory loggerFactory)
    {
        _configuration = configuration;
        _loggerFactory = loggerFactory;
    }

    /// <summary>
    /// Typed chat hub context.
    /// </summary>
    public ServiceHubContext<I> ChatHubContext { get; private set; }

    /// <summary>
    /// Initializes the chat hub context on start.
    /// </summary>
    /// <param name="cancellationToken">Used to abort the start of the hosted service.</param>
    async Task IHostedService.StartAsync(CancellationToken cancellationToken)
    {
        using var serviceManager = new ServiceManagerBuilder()
            .WithConfiguration(_configuration)
            .WithNewtonsoftJson(config => config.PayloadSerializerSettings.ContractResolver = new CamelCasePropertyNamesContractResolver())
            .WithLoggerFactory(_loggerFactory)
            .BuildServiceManager();

        ChatHubContext = await serviceManager.CreateHubContextAsync<I>(typeof(T).Name, cancellationToken);
    }

    /// <summary>
    /// Disposes the context on shutdown.
    /// </summary>
    /// <param name="cancellationToken">Indicates that the shutdown process should no longer be graceful.</param>
    async Task IHostedService.StopAsync(CancellationToken cancellationToken) => await DisposeAsync();

    /// <summary>
    /// Disposed the context on shutdown.
    /// </summary>
    /// <returns>Awaitable result of an asynchronous operation.</returns>
    public ValueTask DisposeAsync()
    {
        if (ChatHubContext == null)
            return ValueTask.CompletedTask;

        return ChatHubContext.DisposeAsync();
    }
}

/// <summary>
/// Adapter to make a ServiceHubContext compatible with a IHubContext<THub,T>.
/// Configure this adapter in a WebJob for example to resolve typed IHubContext<MyHub, IMyHubTyped> injected into a 
/// business service which can be shared with a WebApi project for example.
/// => means both the WebApi and a WebJob can use the same service to deliver SignalR messages.
/// </summary>
/// <typeparam name="T">SingalR Hub.</typeparam>
/// <typeparam name="I">The type of client.</typeparam>
public class HubContextAdapter<T, I> : IHubContext<T, I> where T : Hub<I> where I : class
{
    private readonly ServiceHubContext<I> _serviceHubContext;

    public HubContextAdapter(SignalRContextProvider<T, I> provider)
    {
        _serviceHubContext = provider.ChatHubContext;
    }

    /// <summary>
    /// Gets a <see cref="IHubClients{T}"/> that can be used to invoke methods on clients connected to the hub.
    /// </summary>
    public IHubClients<I> Clients => _serviceHubContext.Clients;

    /// <summary>
    /// Gets a <see cref="GroupManager"/> that can be used to add remove connections to named groups.
    /// </summary>
    public IGroupManager Groups => _serviceHubContext.Groups;
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接