重复触发现有文件的Azure BlobStorage函数触发器

3

我刚刚上传了数十GB的文件到Azure CloudStorage中。每个文件都应该被一个FunctionApp捕获和处理,以响应触发器:

[FunctionName(nameof(ImportDataFile))]
public async Task ImportDataFile(
    // Raw JSON Text file containing data updates in expected schema
    [BlobTrigger("%AzureStorage:DataFileBlobContainer%/{fileName}", Connection = "AzureStorage:ConnectionString")]
    Stream blobStream,
    string fileName)
{
    //...
}

这通常可以正常工作,但愚蠢的是,在将所有文件上传到我们的UAT系统之前,我没有对该函数进行最终测试......而且上传时出现了问题 :(
由于CoViD-19,上传需要几天时间(通过我的国内互联网),因此我真的不想必须重新做一遍。
有没有一种“重放”BlobUpload触发器的方法?这样,函数会再次触发,就像我刚刚重新上传文件一样...而不必再传输任何数据!
5个回答

6
根据链接,Azure Functions会将 Blob 接收记录存储在 Azure 存储帐户的名为 azure-webjobs-hosts 的容器中(由应用程序设置 AzureWebJobsStorage 定义)。要强制重新处理 Blob,请手动从 azure-webjobs-hosts 容器中删除该 Blob 的接收记录。虽然重新处理可能不会立即发生,但保证会在以后的某个时间点发生。要立即重新处理,可以更新 azure-webjobs-hosts/blobscaninfo 中的 scaninfo Blob。任何最后修改时间戳在 LatestScan 属性之后的 Blob 都将再次进行扫描

1
太好了,谢谢!我浏览了那个页面,但是没有找到那个部分! - undefined
你能试试看吗?我自己还没试过。 - undefined
哦,你的意思是编辑答案并包括一个参考你的评论,说明它实际上并不起作用? - undefined
哦,你是说要编辑答案并在评论中提到它实际上不起作用吗? - undefined
你可能还想在毒药队列中检查相关的 Blob。 https://learn.microsoft.com/zh-cn/azure/azure-functions/functions-bindings-storage-blob-trigger?tabs=csharp#poison-blobs - undefined
显示剩余2条评论

5

我发现了一个不太优雅的解决方法,它重新处理了现有文件:

如果您向Blob添加元数据,似乎会重新触发BlobStorage函数触发器。

在Azure Storage Explorer中访问,但是右键单击Blob > 属性 > 添加元数据。

我设置了键:“ForceRefresh”,值为“test”。


1
我在编程中遇到了有关Blob处理的问题,导致在webjobs-blobtrigger-poison队列中出现了大量消息。我不得不将它们移回到azure-webjobs-blobtrigger-name-of-function-app。在进行上述步骤之前,删除Blob收据并调整scaninfo Blob是行不通的。
幸运的是,Azure Storage Explorer有一个菜单选项可以将消息从一个队列移动到另一个队列:

enter image description here


0

我找到了一个解决办法,如果你对文件名没有特别要求:

Azure Storage Explorer在顶部工具栏上有一个“使用新名称克隆”按钮,它将添加一个新文件(并触发函数),而无需通过本地机器传输数据。

请注意,“复制”后跟“粘贴”也会重新触发blob,但似乎会将数据传输到您的机器,然后再次上传...非常慢!


请注意,执行此操作似乎不会更改“LastModifiedStamp”,所以谁知道它将如何与ScanInfo交互。 - undefined

0
其他的答案对我来说要么不起作用,要么是一个太昂贵的操作,不可行。由于一次故障,我不得不对10万个以上的blob进行此操作,这是我编写的管理此功能的应用程序。需要两个包: <PackageReference Include="Azure.Storage.Blobs" Version="12.19.0" /> <PackageReference Include="System.Linq.Async" Version="6.0.1" />
public class Program
{
    private static string _blobStorageConnectionString = "YOURCONNECTIONSTRING";
    private static DateTime _filterAfterDate = new DateTime(2023, 10, 17);

    public static async Task Main(string[] args)
    {
        BlobServiceClient blobServiceClient = new BlobServiceClient(_blobStorageConnectionString);
        BlobContainerClient blobContainerClient = blobServiceClient.GetBlobContainerClient("YOUR_CONTAINER_HERE");
        long iterator = 0;
        await foreach (var blob in blobContainerClient.GetBlobsAsync(BlobTraits.Metadata).Where(x => x.Properties.LastModified > _filterAfterDate/*OR ANY OTHER FILTERS*/))
        {
            BlobClient blobClient = blobContainerClient.GetBlobClient(blob.Name);
            blobClient.SetMetadata(blob.Metadata.ToDictionary(x => x.Key, y => y.Value));

            Console.WriteLine($"{++iterator} : Resubmitted blob {blob.Name}\nLastModifiedDate {blob.Properties.LastModified}\n\n");
        }
    }
}

这实际上不需要下载或重置任何的数据块,并且保留元数据。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接