1

I am a C# developer interested in learning F#. I have a simple AWS lambda function that is triggered when a user uploads new file (or files) to the S3 storage. Each file is then parsed and its contents are sent to an API gateway.

The code is basically functional, but I am struggling to chain all the asynchronous functions together. So far, I have been (mis)using Async.RunSynchronously to get a proof of concept. Here is the code of the main function:

namespace MyProject

open Amazon.Lambda.Core
open Amazon
open Amazon.S3
open Amazon.S3.Util
open System.IO
open Amazon.S3.Model
open Amazon.SecretsManager.Extensions.Caching


[<assembly: LambdaSerializer(typeof<Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer>)>]
()

type Function() =    
    member __.FunctionHandler (input: S3EventNotification) (_: ILambdaContext) =        
        async {
            use client = new AmazonS3Client(RegionEndpoint.EUWest1)
            use secretsCache = new SecretsManagerCache()

            // ApiClient.authenticate: SecretManagerCache -> Async<string>
            // Sends a POST request to the API in order to obtain an authentication token
            let! token = ApiClient.authenticate secretsCache

            // ApiClient.getExistingIds: SecretManagerCache -> Async<string[]>
            // Gets a list of already existing IDs from the API
            let! existingIds = ApiClient.getExistingIds secretsCache

            // input.Records is a C# List<S3EventNotificationRecord>
            for record in input.Records do
                // MyParser.processFile: AmazonS3Client -> S3EventNotificationRecord -> Async<MyJsonModel list>
                // Downloads the actual contents of the file specified in the S3EventNotification
                // and parses it using an FSharp.Data.JsonProvider into individual items
                let! json = MyParser.processFile client record

                // Split the items into a list that should be updated and a list that should be created
                let (putList, postList) = json
                                |> List.partition (fun item ->
                                  Array.contains item.Id existingIds)
                
                for item in putList do
                    // ApiClient.putLocation: string -> SecretsManagerCache -> MyJsonModel -> Async<unit>
                    // Tries to PUT an item and writes the result into logs
                    ApiClient.putLocation token secretsCache item
                    |> ignore

                for item in postList do
                    // ApiClient.postLocation: string -> SecretsManagerCache -> MyJsonModel -> Async<unit>
                    // Tries to POST an item and writes the result into logs
                    ApiClient.postLocation token secretsCache item
                    |> ignore
        } //??? What to put here? Async.RunSynchronously?

To put the code into words:

  1. Firstly, I need to obtain an authentication token for the API.
  2. Then, I need to get a list of already existing items (or their IDs) from the API.
  3. Next, I load and parse each of the uploaded files. This can be done in parallel (probably using Async.Parallel)
  4. Each file produces a list of items, which is then split into a list for updating and a list for creating.
  5. All the items from the putList and the postList are then sent to the API. The result of each request is logged. Both lists can also be processed in parallel.

The thing that I struggle most with is how to "attach" POSTing and PUTting of the parsed items to the actual parsing, if all operations are done using Async.Parallel. Also, do I need to add a single Async.RunSynchronously to the very end of my FunctionHandler, or will it be executed even without this statement?

Lastly, I am already calling a couple of Async.AwaitTask functions to convert the C# Task<T> objects provided by the AmazonS3Client and SecretsManagerCache objects. Does Async.AwaitTask simply convert Task<T> to Async<T>, or does it somehow change the flow of the asynchronous computation?

2
  • 2
    Unrelated to the question, but you need to move the two use lines inside of async, otherwise those objects will be destroyed before async even starts Commented Jul 30, 2020 at 13:15
  • Thanks for the warning :) I will update my code. Commented Jul 30, 2020 at 13:35

1 Answer 1

2

Your function should use an async signature since it does long processing as per the aws doc:

member __.FunctionHandler ... : Threading.Tasks.Task<'T>

You can do so by finishing your async with

async {
   return true
}
|> Async.StartAsTask

Inside your function you can use Async.Parallel every time you have Seq<Async 'T> and need a Async<'T[]>. In your case you have Seq<Async<unit>> so you need to ignore the result with Async.ignore.

async{

do!   putList
      |>Seq.map (ApiClient.putLocation token secretsCache)
      |>Async.parallel
      |>Async.ignore

...
}

The processing of input records can also be parallelized, if you wrap all the procesing into an Async<unit> function just like I've shown above for the putlist and putlocation.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.