如何向IEventProcessor实现传递参数

8

我正在忙于为Azure EventBus客户端实现一个EventProcessorHost客户端。

我有一个实现了IEventProcessor接口的类,如下所示:

 public class MyEventProcessor : IEventProcessor
    {
        Stopwatch checkpointStopWatch;            

        //TODO: get provider id from parent class     


        public async Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Debug.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
            if (reason == CloseReason.Shutdown)
            {
                await context.CheckpointAsync();
            }
        }

        public Task OpenAsync(PartitionContext context)
        {
            Debug.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
            eventHandler = new MyEventHandler();
            this.checkpointStopWatch = new Stopwatch();
            this.checkpointStopWatch.Start();
            return Task.FromResult<object>(null);
        }

        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (EventData eventData in messages)
            {
                string data = Encoding.UTF8.GetString(eventData.GetBytes());              
                Debug.WriteLine(data);       
            }
            //Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts.
            if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
            {
                await context.CheckpointAsync();
                this.checkpointStopWatch.Restart();
            }
        }
    }

我接下来会这样调用:

然后我这样调用:

 EventProcessorHost _eventProcessorHost = new EventProcessorHost(eventProcessorHostName, EndpointName, EventHubConsumerGroup.DefaultGroupName, ConnectionString, storageConnectionString, "messages-events");
 await _eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>();

我需要向 EventProcessorHost 创建的 MyEventProcessor 实例传递一个参数。我该如何做到这一点?

2个回答

22

您只需使用RegisterEventProcessorFactoryAsync来传递工厂实例。该工厂类可以通过在工厂方法中将参数传递到工厂中,或使工厂变化行为的方式传递任何适当的参数。在下面草绘的代码中,您可以看到两个参数被传递给IEventProcessor。其中一个来自工厂的参数,另一个是计数器,用于记录工厂已被调用的次数。

class AzureStreamProcessor : IEventProcessor
{
     ....
}

class AzureStreamProcessorFactory : IEventProcessorFactory
{
    public AzureStreamProcessorFactory(string str)
    {
         this.randomString = str;
    }

    private string randomString;
    private int numCreated = 0;
    IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext context)
    {
        return new AzureStreamProcessor(context, randomString, Interlocked.Increment(ref numCreated));
    }
}

host.RegisterEventProcessorFactoryAsync(new AzureStreamProcessorFactory("a parameter"), options);

这是正确的方式! - Jose Parra

1

可以尝试对MyEventProcessor类进行构造函数依赖注入,参数类似于以下内容。

     public class MyEventProcessor : IEventProcessor
    {
        Stopwatch checkpointStopWatch;            

        //TODO: get provider id from parent class     
    IParameters _parameter;
    public MyEventProcessor (IParameters param)
    {
      this._parameter  = param;
     }

        public async Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Debug.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
            if (reason == CloseReason.Shutdown)
            {
                await context.CheckpointAsync();
            }
        }.....

使用_parameter来检索所需内容。
以下是如何为您的IParameters注册依赖项。
这里我使用Ninject依赖项解析器。
//Bind the class that implements IParameter.
 var parameters = new Parameter();
paramters.Property = "my data"

 kernel.Bind<IParameters>().ToConstant(parameters);

希望有所帮助。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接