I have a blob store with 1000 JSON files and I want to port code from C# to Rust. I was surprised that my Rust was 3-6 times slower. C# consistently took 2 seconds, and Rust anywhere between 6 and 12 seconds, on same machine of course. both compiled to release of course.
Is that because C# is also Microsoft and its just more optimized package or am I doing it wrong?
C# function:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Newtonsoft.Json;
using OToo.Models;
using DotNetEnv;
using Azure;
using System.Collections.Concurrent;
namespace OToo.Services
{
public class AzureBlobService
{
private readonly string? _clientId;
private readonly string? _tenantId;
private readonly string? _clientSecret;
private readonly string? _accountName;
private readonly string? _containerName;
public AzureBlobService()
{
Env.Load();
_clientId = Environment.GetEnvironmentVariable("AZURE_CLIENT_ID");
_tenantId = Environment.GetEnvironmentVariable("AZURE_TENANT_ID");
_clientSecret = Environment.GetEnvironmentVariable("AZURE_CLIENT_SECRET");
_accountName = Environment.GetEnvironmentVariable("AZURE_ACCOUNT_NAME");
_containerName = Environment.GetEnvironmentVariable("AZURE_CONTAINER_NAME");
if (string.IsNullOrEmpty(_clientId) || string.IsNullOrEmpty(_tenantId) ||
string.IsNullOrEmpty(_clientSecret) || string.IsNullOrEmpty(_accountName) ||
string.IsNullOrEmpty(_containerName))
{
throw new InvalidOperationException("Missing required Azure credentials in environment variables.");
}
}
private BlobContainerClient GetBlobContainerClient()
{
var credential = new ClientSecretCredential(_tenantId, _clientId, _clientSecret);
var blobServiceClient = new BlobServiceClient(new Uri($"https://{_accountName}.blob.core.windows.net"), credential);
return blobServiceClient.GetBlobContainerClient(_containerName);
}
public async Task<List<AzureRawTradeData>> LoadTradesFromFolder(string? prefix)
{
if (string.IsNullOrEmpty(prefix))
{
return new List<AzureRawTradeData>();
}
var trades = new ConcurrentBag<AzureRawTradeData>();
var containerClient = GetBlobContainerClient();
var blobItems = new List<BlobItem>();
await foreach (var blobItem in containerClient.GetBlobsAsync(prefix: prefix))
{
blobItems.Add(blobItem);
}
await Parallel.ForEachAsync(blobItems, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, async (blobItem, _) =>
{
var blobClient = containerClient.GetBlobClient(blobItem.Name);
try
{
var response = await blobClient.DownloadContentAsync();
var jsonString = Encoding.UTF8.GetString(response.Value.Content.ToArray());
var trade = JsonConvert.DeserializeObject<AzureRawTradeData>(jsonString);
if (trade != null)
{
trades.Add(trade);
}
}
catch (JsonException ex)
{
Console.WriteLine($"Failed to parse JSON from {blobItem.Name}: {ex.Message}");
}
catch (RequestFailedException ex)
{
Console.WriteLine($"Failed to download {blobItem.Name}: {ex.Message}");
}
});
return trades.ToList();
}
public async Task<List<AzureRawTradeData>> FetchAllTrades(List<string?> prefixes)
{
var allTrades = new List<AzureRawTradeData>();
var tradeTasks = prefixes.Where(p => !string.IsNullOrEmpty(p))
.Select(prefix => LoadTradesFromFolder(prefix!));
var tradeLists = await Task.WhenAll(tradeTasks);
foreach (var tradeList in tradeLists)
{
allTrades.AddRange(tradeList);
}
return allTrades;
}
}
}
And here the Rust code (two attempts, perform similar though):
use azure_storage::StorageCredentials;
use azure_storage_blobs::prelude::*;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use futures::stream::{StreamExt, FuturesUnordered};
use std::env;
use crate::TradeData;
#[derive(Serialize)]
struct TokenRequest<'a> {
grant_type: &'a str,
client_id: &'a str,
client_secret: &'a str,
scope: &'a str,
}
#[derive(Deserialize)]
struct TokenResponse {
access_token: String,
}
pub async fn fetch_trades_from_azure() -> Result<Vec<TradeData>, String> {
let start = std::time::Instant::now();
println!("[LOG] [Azure] Starting fetch_trades_from_azure");
let tenant_id = env::var("AZURE_TENANT_ID").map_err(|e| e.to_string())?;
let client_id = env::var("AZURE_CLIENT_ID").map_err(|e| e.to_string())?;
let client_secret = env::var("AZURE_CLIENT_SECRET").map_err(|e| e.to_string())?;
let storage_account = env::var("AZURE_ACCOUNT_NAME").map_err(|e| e.to_string())?;
let container_name = env::var("AZURE_CONTAINER_NAME").map_err(|e| e.to_string())?;
let prefix_opt = env::var("AZURE_PREFIX_OPT").map_err(|e| e.to_string())?;
let prefix_fut = env::var("AZURE_PREFIX_FUT").map_err(|e| e.to_string())?;
let auth_start = std::time::Instant::now();
println!("[LOG] [Azure] Authentication step started");
let token = get_oauth_token(&tenant_id, &client_id, &client_secret).await.map_err(|e| e.to_string())?;
println!("[LOG] [Azure] Authentication took {:.2} seconds", auth_start.elapsed().as_secs_f32());
let credentials = StorageCredentials::bearer_token(token);
let blob_service = BlobServiceClient::new(storage_account, credentials);
let container_client = blob_service.container_client(container_name);
let list_start = std::time::Instant::now();
println!("[LOG] [Azure] Listing blobs step started");
let (trades_opt, trades_fut) = futures::join!(
process_blobs(&container_client, prefix_opt),
process_blobs(&container_client, prefix_fut)
);
println!("[LOG] [Azure] Listing blobs took {:.2} seconds", list_start.elapsed().as_secs_f32());
let download_start = std::time::Instant::now();
println!("[LOG] [Azure] Downloading blobs step started");
let trades: Vec<TradeData> = trades_opt.into_iter().chain(trades_fut).map(|(_name, trade)| trade).collect();
println!("[LOG] [Azure] Downloading blobs took {:.2} seconds", download_start.elapsed().as_secs_f32());
println!("[LOG] [Azure] Total fetch_trades_from_azure time: {:.2} seconds", start.elapsed().as_secs_f32());
Ok(trades)
}
async fn get_oauth_token(tenant_id: &str, client_id: &str, client_secret: &str) -> Result<String, Box<dyn std::error::Error>> {
let token_url = format!("https://login.microsoftonline.com/{}/oauth2/v2.0/token", tenant_id);
let params = TokenRequest {
grant_type: "client_credentials",
client_id,
client_secret,
scope: "https://storage.azure.com/.default",
};
let client = Client::new();
let response = client.post(&token_url)
.form(¶ms)
.send()
.await?;
let status = response.status();
if !status.is_success() {
let body_text = response.text().await.unwrap_or_else(|e| format!("Failed to read error body: {e}"));
return Err(format!("token endpoint returned non-success status: {status}, body: {body_text}").into());
}
// Try to parse as TokenResponse, but if it fails, provide the raw text for debugging.
let body_text = response.text().await?;
match serde_json::from_str::<TokenResponse>(&body_text) {
Ok(token_response) => Ok(token_response.access_token),
Err(e) => {
Err(format!("Failed to decode token response. Error: {e}. Raw response body: {body_text}").into())
}
}
}
async fn process_blobs(container_client: &ContainerClient, prefix: String) -> Vec<(String, TradeData)> {
let mut trades = Vec::new();
let mut stream = container_client.list_blobs().prefix(prefix.clone()).into_stream();
while let Some(response) = stream.next().await {
match response {
Ok(response) => {
let mut futures = FuturesUnordered::new();
for blob in response.blobs.blobs() {
if blob.name.ends_with(".json") {
let blob_client = container_client.blob_client(&blob.name);
let blob_name = blob.name.clone();
futures.push(async move {
match blob_client.get_content().await {
Ok(data) => match serde_json::from_slice::<TradeData>(&data) {
Ok(trade_data) => Some((blob_name, trade_data)),
Err(_) => None,
},
Err(_) => None,
}
});
}
}
while let Some(result) = futures.next().await {
if let Some((name, trade)) = result {
trades.push((name, trade));
}
}
}
Err(_) => {}
}
}
trades
}
// ============================================================================
// OPTIMIZED VERSION - Keeps baseline's per-page download approach but parallelizes
// both prefix processing to reduce total time
// ============================================================================
pub async fn fetch_trades_from_azure_optimized() -> Result<Vec<TradeData>, String> {
let start = std::time::Instant::now();
println!("[LOG] [Azure OPTIMIZED] Starting fetch_trades_from_azure_optimized");
let tenant_id = env::var("AZURE_TENANT_ID").map_err(|e| e.to_string())?;
let client_id = env::var("AZURE_CLIENT_ID").map_err(|e| e.to_string())?;
let client_secret = env::var("AZURE_CLIENT_SECRET").map_err(|e| e.to_string())?;
let storage_account = env::var("AZURE_ACCOUNT_NAME").map_err(|e| e.to_string())?;
let container_name = env::var("AZURE_CONTAINER_NAME").map_err(|e| e.to_string())?;
let prefix_opt = env::var("AZURE_PREFIX_OPT").map_err(|e| e.to_string())?;
let prefix_fut = env::var("AZURE_PREFIX_FUT").map_err(|e| e.to_string())?;
let auth_start = std::time::Instant::now();
println!("[LOG] [Azure OPTIMIZED] Authentication step started");
let token = get_oauth_token(&tenant_id, &client_id, &client_secret).await.map_err(|e| e.to_string())?;
println!("[LOG] [Azure OPTIMIZED] Authentication took {:.2} seconds", auth_start.elapsed().as_secs_f32());
let credentials = StorageCredentials::bearer_token(token);
let blob_service = BlobServiceClient::new(storage_account, credentials);
let container_client = blob_service.container_client(container_name);
let list_start = std::time::Instant::now();
println!("[LOG] [Azure OPTIMIZED] List+Download (parallel prefixes) started");
// Process both prefixes in parallel (same as baseline but concurrent)
let (trades_opt, trades_fut) = futures::join!(
process_blobs(&container_client, prefix_opt),
process_blobs(&container_client, prefix_fut)
);
println!("[LOG] [Azure OPTIMIZED] List+Download took {:.2} seconds", list_start.elapsed().as_secs_f32());
let trades: Vec<TradeData> = trades_opt.into_iter().chain(trades_fut).map(|(_name, trade)| trade).collect();
println!("[LOG] [Azure OPTIMIZED] Total: {} trades in {:.2} seconds", trades.len(), start.elapsed().as_secs_f32());
Ok(trades)
}