2

I am trying to read the InputStream of a large blob from an Azure container and store it in another cloud storage of my own.

For small files, it is working perfectly. But for large blobs (around 3GB), the Azure connection is getting timed out after few minutes.

I have also tried setting all the timeout values to maximum. It didn't help either.

Code snippet:

HttpClient httpClient = new NettyAsyncHttpClientBuilder();
                           .readTimeout(Duration.ofDays(365))
                           .responseTimeout(Duration.ofDays(365))
                           .connectTimeout(Duration.ofDays(365)).build();

RequestRetryOptions retryOptions = new RequestRetryOptions(
                RetryPolicyType.EXPONENTIAL,
                3,
                Duration.ofSeconds(30),
                null,
                null,
                null
            );

HttpPipelinePolicy timeoutPolicy = new TimeoutPolicy(Duration.ofDays(365));
        
HttpClientOptions httpclientoptions = new HttpClientOptions()
                             .readTimeout(Duration.ofDays(365))
                             .responseTimeout(Duration.ofDays(365))
                             .setConnectionIdleTimeout(Duration.ofDays(365))
                             .setConnectTimeout(Duration.ofDays(365))
                             .setWriteTimeout(Duration.ofDays(365));


// Building the Azure Client
BlobServiceClient azureClient = new BlobServiceClientBuilder()
                .connectionString("My_Connection_String_For_Authentication")
                .httpClient(httpClient)
                .addPolicy(timeoutPolicy)
                .clientOptions(httpclientoptions)
                .retryOptions(retryOptions)
                .buildClient();

BlobContainerClient containerClient = azureClient.getBlobContainerClient("{azureContainerName}");

PagedResponse<BlobItem> pagedResponse = containerClient.listBlobs(options, continuationToken, null).iterableByPage().iterator().next();

List<BlobItem> pageItems = pagedResponse.getValue();
for(BlobItem blob : pageItems) {

    // Opening the inputStream of the Blob
    try(InputStream blobInputStream = containerClient.getBlobVersionClient(key, versionId).openInputStream();
       BufferedInputStream bufferedStream = new BufferedInputStream(blobInputStream);){

            // reading the bufferedStream and piping it to my own cloud storage.
    }

When reading the bytes from the InputStream, I'm getting the below error after few minutes.

java.net.SocketException: Broken pipe (Write failed)
    at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
    at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
    at java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:346)
    at java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1213)
    at okio.OutputStreamSink.write(JvmOkio.kt:56)
    at okio.AsyncTimeout$sink$1.write(AsyncTimeout.kt:102)
    at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
    at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
    at okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSink.write(Http1ExchangeCodec.kt:311)
    at okio.ForwardingSink.write(ForwardingSink.kt:29)
    at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
    at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
    at okio.RealBufferedSink.writeAll(RealBufferedSink.kt:195)
    at com.zoho.nebula.requests.okhttp3.Okhttp3HttpClient$2.writeTo(Okhttp3HttpClient.java:284)
    at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.kt:62)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
    at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:34)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
    at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:95)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
    at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
    at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:76)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
    at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
    at okhttp3.internal.connection.RealCall.execute(RealCall.kt:154)
    at com.zoho.nebula.requests.okhttp3.Okhttp3HttpClient.execute(Okhttp3HttpClient.java:89)
    ... 36 more
    Suppressed: java.net.SocketException: Operation timed out (Read failed)
        at java.base/java.net.SocketInputStream.socketRead0(Native Method)
        at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
        at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
        at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
        at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:478)
        at java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:472)
        at java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
        at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1333)
        at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:976)
        at okio.InputStreamSource.read(JvmOkio.kt:93)
        at okio.AsyncTimeout$source$1.read(AsyncTimeout.kt:128)
        at okio.RealBufferedSource.indexOf(RealBufferedSource.kt:430)
        at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.kt:323)
        at okhttp3.internal.http1.HeadersReader.readLine(HeadersReader.kt:29)
        at okhttp3.internal.http1.Http1ExchangeCodec.readResponseHeaders(Http1ExchangeCodec.kt:180)
        at okhttp3.internal.connection.Exchange.readResponseHeaders(Exchange.kt:110)
        at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.kt:93)

Please assist me on how to fix this and explain what am I doing wrong.

Dependencies used:

azure-storage-blob-12.22.3.jar
azure-storage-common-12.21.2.jar
azure-core-1.40.0.jar
azure-storage-common-12.21.2.jar
azure-core-http-netty-1.13.4.jar
reactor-netty-http-1.0.31.jar
reactor-netty-core-1.0.31.jar
netty-resolver-dns-4.1.89.Final.jar
azure-json-1.0.1.jar

1 Answer 1

0

trying to read the InputStream of a large blob from an Azure container and store it in another cloud storage of my own.

You can use the below code to read the larger files from an Azure blob storage using Azure Java SDK.

Code:

 private static final String CONNE_STRING="xxxxt";
 private static final String CONTAINER_NAME = "xxxx";
 private static final String BLOB_NAME = "xxxxx";

 public static void main(String[] args) {
     com.azure.core.http.HttpClient httpClient = new NettyAsyncHttpClientBuilder()
             .readTimeout(Duration.ofDays(365))
             .responseTimeout(Duration.ofDays(365))
             .connectTimeout(Duration.ofDays(365))
             .build();

     RequestRetryOptions retryOptions = new RequestRetryOptions(
             RetryPolicyType.EXPONENTIAL,
             3,
             Duration.ofSeconds(30),
             null,
             null,
             null
     );

     HttpPipelinePolicy timeoutPolicy = new TimeoutPolicy(Duration.ofDays(365));
     
     BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
             .connectionString(CONNE_STRING)
             .httpClient(httpClient)
             .addPolicy(timeoutPolicy)
             .retryOptions(retryOptions)
             .buildClient();

     BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(CONTAINER_NAME);
     BlobClient blobClient = containerClient.getBlobClient(BLOB_NAME);

     readBlobInChunks(blobClient);
 }

 private static void readBlobInChunks(BlobClient blobClient) {
     final long chunkSize = 8 * 1024 * 1024; // 8MB chunks

     try {
         BlobProperties properties = blobClient.getProperties();
         long blobSize = properties.getBlobSize();
         for (long offset = 0; offset < blobSize; offset += chunkSize) {
             long count = Math.min(chunkSize, blobSize - offset);
             try (InputStream blobInputStream = blobClient.openInputStream(new BlobRange(offset, count), null);
                  BufferedInputStream bufferedStream = new BufferedInputStream(blobInputStream)) {
                 processChunk(bufferedStream, count);
             }
         }
         System.out.println("Blob read successfully in chunks.");
     } catch (IOException e) {
         e.printStackTrace();
     }
 }

 private static void processChunk(InputStream inputStream, long count) throws IOException {
     byte[] buffer = new byte[8192];
     int bytesRead;
     long totalBytesRead = 0;
     while (totalBytesRead < count && (bytesRead = inputStream.read(buffer)) != -1) {
         totalBytesRead += bytesRead;
         System.out.write(buffer, 0, bytesRead);
     }
 }

The method reads a large file from Azure Blob Storage in 8 MB chunks and processes each chunk. Currently, the processChunk method prints the data to the console.

You can change the processChunk method to upload the data to your own cloud storage instead of printing it. You can also modify the method that reads the chunks to upload each chunk to your cloud storage in parallel.

Output:

Blob read successfully in chunks.

enter image description here

Sign up to request clarification or add additional context in comments.

7 Comments

I tried the above solution. I still get the 'Broken pipe' exception when reading a blob of size 3.62 GB. I am getting the exception when reading the 11th chunk. i.e. blobClient.openInputStream(new BlobRange(75497472, 8388608). The first 10 chunks are read without any problem. Please help. Also kindly let me know if any other information required.
Try to use azure Rest API and Refer this stackoverflow.com/questions/76672924/…
As you suggested, I tried using Azure REST API too, with SharedKey authorization. I'm still facing the 'Broken Pipe' error when the file size is large ☹️ . And the link you have attached above, is regarding uploading blob to azure. But my case is to download large blob from Azure container.
Thanks. Can you kindly point me the exact snippet as I'm not familiar with .NET ?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.