I have a blob store with 1000 JSON files and I want to port code from C# to Rust. I was surprised that my Rust was 3-6 times slower. C# consistently took 2 seconds, and Rust anywhere between 6 and 12 seconds, on same machine of course. both compiled to release of course.

Is that because C# is also Microsoft and its just more optimized package or am I doing it wrong?

C# function:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Newtonsoft.Json;
using OToo.Models;
using DotNetEnv;
using Azure;
using System.Collections.Concurrent;

namespace OToo.Services
{
    public class AzureBlobService
    {
        private readonly string? _clientId;
        private readonly string? _tenantId;
        private readonly string? _clientSecret;
        private readonly string? _accountName;
        private readonly string? _containerName;

        public AzureBlobService()
        {
            Env.Load();

            _clientId = Environment.GetEnvironmentVariable("AZURE_CLIENT_ID");
            _tenantId = Environment.GetEnvironmentVariable("AZURE_TENANT_ID");
            _clientSecret = Environment.GetEnvironmentVariable("AZURE_CLIENT_SECRET");
            _accountName = Environment.GetEnvironmentVariable("AZURE_ACCOUNT_NAME");
            _containerName = Environment.GetEnvironmentVariable("AZURE_CONTAINER_NAME");

            if (string.IsNullOrEmpty(_clientId) || string.IsNullOrEmpty(_tenantId) ||
                string.IsNullOrEmpty(_clientSecret) || string.IsNullOrEmpty(_accountName) ||
                string.IsNullOrEmpty(_containerName))
            {
                throw new InvalidOperationException("Missing required Azure credentials in environment variables.");
            }
        }

        private BlobContainerClient GetBlobContainerClient()
        {
            var credential = new ClientSecretCredential(_tenantId, _clientId, _clientSecret);
            var blobServiceClient = new BlobServiceClient(new Uri($"https://{_accountName}.blob.core.windows.net"), credential);
            return blobServiceClient.GetBlobContainerClient(_containerName);
        }

        public async Task<List<AzureRawTradeData>> LoadTradesFromFolder(string? prefix)
        {
            if (string.IsNullOrEmpty(prefix))
            {
                return new List<AzureRawTradeData>();
            }

            var trades = new ConcurrentBag<AzureRawTradeData>();
            var containerClient = GetBlobContainerClient();
            var blobItems = new List<BlobItem>();

            await foreach (var blobItem in containerClient.GetBlobsAsync(prefix: prefix))
            {
                blobItems.Add(blobItem);
            }

            await Parallel.ForEachAsync(blobItems, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, async (blobItem, _) =>
            {
                var blobClient = containerClient.GetBlobClient(blobItem.Name);

                try
                {
                    var response = await blobClient.DownloadContentAsync();
                    var jsonString = Encoding.UTF8.GetString(response.Value.Content.ToArray());
                    var trade = JsonConvert.DeserializeObject<AzureRawTradeData>(jsonString);

                    if (trade != null)
                    {
                        trades.Add(trade);
                    }
                }
                catch (JsonException ex)
                {
                    Console.WriteLine($"Failed to parse JSON from {blobItem.Name}: {ex.Message}");
                }
                catch (RequestFailedException ex)
                {
                    Console.WriteLine($"Failed to download {blobItem.Name}: {ex.Message}");
                }
            });

            return trades.ToList();
        }

        public async Task<List<AzureRawTradeData>> FetchAllTrades(List<string?> prefixes)
        {
            var allTrades = new List<AzureRawTradeData>();
            var tradeTasks = prefixes.Where(p => !string.IsNullOrEmpty(p))
                                     .Select(prefix => LoadTradesFromFolder(prefix!));
            var tradeLists = await Task.WhenAll(tradeTasks);

            foreach (var tradeList in tradeLists)
            {
                allTrades.AddRange(tradeList);
            }

            return allTrades;
        }
    }
}

And here the Rust code (two attempts, perform similar though):

use azure_storage::StorageCredentials;
use azure_storage_blobs::prelude::*;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use futures::stream::{StreamExt, FuturesUnordered};
use std::env;

use crate::TradeData;

#[derive(Serialize)]
struct TokenRequest<'a> {
    grant_type: &'a str,
    client_id: &'a str,
    client_secret: &'a str,
    scope: &'a str,
}

#[derive(Deserialize)]
struct TokenResponse {
    access_token: String,
}

pub async fn fetch_trades_from_azure() -> Result<Vec<TradeData>, String> {
    let start = std::time::Instant::now();
    println!("[LOG] [Azure] Starting fetch_trades_from_azure");

    let tenant_id = env::var("AZURE_TENANT_ID").map_err(|e| e.to_string())?;
    let client_id = env::var("AZURE_CLIENT_ID").map_err(|e| e.to_string())?;
    let client_secret = env::var("AZURE_CLIENT_SECRET").map_err(|e| e.to_string())?;
    let storage_account = env::var("AZURE_ACCOUNT_NAME").map_err(|e| e.to_string())?;
    let container_name = env::var("AZURE_CONTAINER_NAME").map_err(|e| e.to_string())?;
    let prefix_opt = env::var("AZURE_PREFIX_OPT").map_err(|e| e.to_string())?;
    let prefix_fut = env::var("AZURE_PREFIX_FUT").map_err(|e| e.to_string())?;

    let auth_start = std::time::Instant::now();
    println!("[LOG] [Azure] Authentication step started");
    let token = get_oauth_token(&tenant_id, &client_id, &client_secret).await.map_err(|e| e.to_string())?;
    println!("[LOG] [Azure] Authentication took {:.2} seconds", auth_start.elapsed().as_secs_f32());

    let credentials = StorageCredentials::bearer_token(token);
    let blob_service = BlobServiceClient::new(storage_account, credentials);
    let container_client = blob_service.container_client(container_name);

    let list_start = std::time::Instant::now();
    println!("[LOG] [Azure] Listing blobs step started");
    let (trades_opt, trades_fut) = futures::join!(
        process_blobs(&container_client, prefix_opt),
        process_blobs(&container_client, prefix_fut)
    );
    println!("[LOG] [Azure] Listing blobs took {:.2} seconds", list_start.elapsed().as_secs_f32());

    let download_start = std::time::Instant::now();
    println!("[LOG] [Azure] Downloading blobs step started");
    let trades: Vec<TradeData> = trades_opt.into_iter().chain(trades_fut).map(|(_name, trade)| trade).collect();
    println!("[LOG] [Azure] Downloading blobs took {:.2} seconds", download_start.elapsed().as_secs_f32());

    println!("[LOG] [Azure] Total fetch_trades_from_azure time: {:.2} seconds", start.elapsed().as_secs_f32());
    Ok(trades)
}


async fn get_oauth_token(tenant_id: &str, client_id: &str, client_secret: &str) -> Result<String, Box<dyn std::error::Error>> {
    let token_url = format!("https://login.microsoftonline.com/{}/oauth2/v2.0/token", tenant_id);
    let params = TokenRequest {
        grant_type: "client_credentials",
        client_id,
        client_secret,
        scope: "https://storage.azure.com/.default",
    };
    let client = Client::new();
    let response = client.post(&token_url)
        .form(&params)
        .send()
        .await?;

    let status = response.status();
    if !status.is_success() {
        let body_text = response.text().await.unwrap_or_else(|e| format!("Failed to read error body: {e}"));
        return Err(format!("token endpoint returned non-success status: {status}, body: {body_text}").into());
    }

    // Try to parse as TokenResponse, but if it fails, provide the raw text for debugging.
    let body_text = response.text().await?;
    match serde_json::from_str::<TokenResponse>(&body_text) {
        Ok(token_response) => Ok(token_response.access_token),
        Err(e) => {
            Err(format!("Failed to decode token response. Error: {e}. Raw response body: {body_text}").into())
        }
    }
}

async fn process_blobs(container_client: &ContainerClient, prefix: String) -> Vec<(String, TradeData)> {
    let mut trades = Vec::new();
    let mut stream = container_client.list_blobs().prefix(prefix.clone()).into_stream();
    while let Some(response) = stream.next().await {
        match response {
            Ok(response) => {
                let mut futures = FuturesUnordered::new();
                for blob in response.blobs.blobs() {
                    if blob.name.ends_with(".json") {
                        let blob_client = container_client.blob_client(&blob.name);
                        let blob_name = blob.name.clone();
                        futures.push(async move {
                            match blob_client.get_content().await {
                                Ok(data) => match serde_json::from_slice::<TradeData>(&data) {
                                    Ok(trade_data) => Some((blob_name, trade_data)),
                                    Err(_) => None,
                                },
                                Err(_) => None,
                            }
                        });
                    }
                }
                while let Some(result) = futures.next().await {
                    if let Some((name, trade)) = result {
                        trades.push((name, trade));
                    }
                }
            }
            Err(_) => {}
        }
    }
    trades
}

// ============================================================================
// OPTIMIZED VERSION - Keeps baseline's per-page download approach but parallelizes
// both prefix processing to reduce total time
// ============================================================================

pub async fn fetch_trades_from_azure_optimized() -> Result<Vec<TradeData>, String> {
    let start = std::time::Instant::now();
    println!("[LOG] [Azure OPTIMIZED] Starting fetch_trades_from_azure_optimized");

    let tenant_id = env::var("AZURE_TENANT_ID").map_err(|e| e.to_string())?;
    let client_id = env::var("AZURE_CLIENT_ID").map_err(|e| e.to_string())?;
    let client_secret = env::var("AZURE_CLIENT_SECRET").map_err(|e| e.to_string())?;
    let storage_account = env::var("AZURE_ACCOUNT_NAME").map_err(|e| e.to_string())?;
    let container_name = env::var("AZURE_CONTAINER_NAME").map_err(|e| e.to_string())?;
    let prefix_opt = env::var("AZURE_PREFIX_OPT").map_err(|e| e.to_string())?;
    let prefix_fut = env::var("AZURE_PREFIX_FUT").map_err(|e| e.to_string())?;

    let auth_start = std::time::Instant::now();
    println!("[LOG] [Azure OPTIMIZED] Authentication step started");
    let token = get_oauth_token(&tenant_id, &client_id, &client_secret).await.map_err(|e| e.to_string())?;
    println!("[LOG] [Azure OPTIMIZED] Authentication took {:.2} seconds", auth_start.elapsed().as_secs_f32());

    let credentials = StorageCredentials::bearer_token(token);
    let blob_service = BlobServiceClient::new(storage_account, credentials);
    let container_client = blob_service.container_client(container_name);

    let list_start = std::time::Instant::now();
    println!("[LOG] [Azure OPTIMIZED] List+Download (parallel prefixes) started");
    
    // Process both prefixes in parallel (same as baseline but concurrent)
    let (trades_opt, trades_fut) = futures::join!(
        process_blobs(&container_client, prefix_opt),
        process_blobs(&container_client, prefix_fut)
    );
    
    println!("[LOG] [Azure OPTIMIZED] List+Download took {:.2} seconds", list_start.elapsed().as_secs_f32());

    let trades: Vec<TradeData> = trades_opt.into_iter().chain(trades_fut).map(|(_name, trade)| trade).collect();
    
    println!("[LOG] [Azure OPTIMIZED] Total: {} trades in {:.2} seconds", trades.len(), start.elapsed().as_secs_f32());
    Ok(trades)
}

2 Replies 2

Did you compile as release? Often the simplest of mistakes.

Thanks for the advice, but thats not it, I built both to release.

Your Reply

By clicking “Post Your Reply”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.