how to batch process all Azure Storage Queue Messages into a Blob with Azure Cloud Functions?
Asked Answered
B

2

6

I want to read all messages of an Azure Queue Storage and write them into a Blob. Ideally I would like to read batches of 10000 or more and write them into a Blob.

I am using Azure Cloud Functions with Queue Storage Binding for input and Blob Storage Binding for output, but I can't seen to find an API or a configuration option that would enable me to read more than 1 message. Does anyone know about such an API?

Butler answered 18/2, 2018 at 19:54 Comment(0)
A
5

The official documentation doesn't mention any support for processing Storage Queue messages in batches in a single execution of Azure Function. There is an open issue in WebJobs SDK. So, it's not supported.

If you are flexible which service to use for messaging middleware, you could switch to Event Hubs. Event Hub trigger supports (and encourages) processing messages in batches. It won't probably be 10.000 though: the batch size is limited to 256k of data.

To process Storage Queue messages in batches you'd have to get away from Queue Triggered Functions (e.g. run a function on a timer and connect to the table storage to process all the messages, or have a custom polling Web Job, or use Web Job SDK with a custom trigger).

Ass answered 18/2, 2018 at 20:23 Comment(4)
You can run a function on a timertrigger (every minute) and connect to the table storage to process all the messages ?Timberhead
Thanks for your advice. @thomas thanks, I did it in a similar way - I was using a timer trigger and use the blob queue storage API to read all messages in a buffer, and then write to a blob via blob binding. I will try to describe my solution below...Butler
@Butler You can just edit my answer if it's close to your solutionAss
@Butler Ok, this actually looks like a different answer, better add it below.Ass
B
1

I finally found a solution I am perfectly happy with. Using buffers was not scalable because the runtime can easily exceed 5 minutes limit imposed by the Azure Functions runtime, plus the obvious memory consumption issue, plus I had to use a timer trigger so I would need to somehow make sure all relevant messages are in the queue at a certain time.

What I did now is to using a normal queue binding to acquire a message, and the Node Storage SDK, to implement some sort of "fake" streaming into an Append Blob. So every message is converted one by one into a CSV line, and appended to the respective blob.

Here is my code for that function:

const config = require('./config/config.js')
const storage = require('azure-storage')
const csvTransformer = require('./lib/csvTransform')
const async = require('async')

module.exports = function (context, myQueueItem) {
  context.log(
    'JavaScript queue trigger function processed work item',
    myQueueItem
  )

  let blobService = storage.createBlobService(config.targetBlobConnection)
  let messageDayString = csvTransformer.determineDayFromMessage(myQueueItem)
  let blobName = messageDayString + '.csv'
  let csvMessage
  async.waterfall(
    [
      function (callback) {
        blobService.createContainerIfNotExists(
          config.targetBlobContainer,
          { publicAccessLevel: 'blob' },
          err => {
            callback(err)
          }
        )
      },
      function (callback) {
        blobService.doesBlobExist(
          config.targetBlobContainer,
          blobName,
          null,
          (err, blobResult) => {
            context.log('got blobResult: ', blobResult)
            callback(err, blobResult)
          }
        )
      },
      function (blobResult, callback) {
        if (blobResult && blobResult.exists) {
          csvMessage = csvTransformer.transformMessageToCSV(myQueueItem, false)
          blobService.appendFromText(
            config.targetBlobContainer,
            blobName,
            csvMessage,
            null,
            (err, appendedBlobResult) => {
              context.log('appended to existing blob: ', appendedBlobResult)
              callback(err, appendedBlobResult)
            }
          )
        } else {
          csvMessage = csvTransformer.transformMessageToCSV(myQueueItem, true)
          blobService.createAppendBlobFromText(
            config.targetBlobContainer,
            blobName,
            csvMessage,
            null,
            (err, createdBlobResult) => {
              context.log('created new blob: ', createdBlobResult)
              callback(err, blobResult)
            }
          )
        }
      }
    ],
    function (err, result) {
      if (err) {
        context.log.error('Error happened!')
        context.log.error(err)
        context.done(err)
      } else {
        context.log('appended CSV message to blob')
        context.bindings.outputQueueItem = csvMessage
        context.done()
      }
    }
  )
}
Butler answered 26/3, 2018 at 9:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.