Nikolas Knickrehm

4 min read

Transferring Large Files in TypeScript using Streams

Software Development

We recently tried to optimize a Lambda function that is transferring files between two systems through REST-APIs. It was downloading a file from one system, splitting it up into chunks of a few MB before uploading the chunks to the destination system.

Transferring Large Files in TypeScript using Streams

We noticed that this implementation was far from perfect because the Lambda needs to keep the complete binary in memory during the transfer and the upload will only start after the download was completed. For large files, we experienced a lot of timeouts and decided that throwing more resources and longer timeouts on the problem were no sustainable solutions.

One idea proposed by a colleague was to use streams to process the file chunk by chunk, continuously uploading parts of the file as they were being downloaded from the source system. The initial part of the implementation was rather trivial as downloading a binary file as a stream is really easy to do in TypeScript / JavaScript via Axios:

const downloadStream = await axios.get('<fileUrl>', {
  maxContentLength: Infinity, 
  responseType: 'stream', 
})

downloadStream.on('data',  (chunk) => {
  // upload chunk logic
})

downloadStream.on('end', () => {
  // complete transfer logic
})

Unfortunately, we ran into a problem while uploading the chunks one by one, because we needed to request a new upload URL for every chunk, and uploading the chunks was far slower than downloading new ones. Since Axios provided us with chunks of only a few KB of size we soon experienced a classic backpressure problem, which means that we could not process the incoming data fast enough and therefore created a critical bottleneck that would crash the whole Lambda when it was handling larger files.

To fix this issue we tried to buffer the stream by storing chunks until we collected around 500 KB of data before uploading it as a chunk to the destination system. Since we like abstraction and Node.js has some abstract classes for streams that are very easy to implement we created the following two classes:

import { Transform } from 'stream'

export default class MemoryStream extends Transform {
  private memory: Buffer
  
  constructor(private readonly desiredChunkSize: number) {
    super()
    this.memory = Buffer.alloc(0)
  }
  
  _transform(chunk: Buffer, _: string, cb: () => void): void {
    if (Buffer.byteLength(this.memory) + Buffer.byteLength(chunk) >= this.desiredChunkSize) {
      this.push(this.memory)
      this.memory = Buffer.alloc(0)
    }
    this.memory = Buffer.concat([this.memory, chunk]) 
    cb()
  } 
  
  _flush(cb: () => void): void {
    this.push(this.memory)
    cb()
  }
}
A class that transforms a stream with many small chunks into one with larger chunks
import { Transform } from 'stream'

export default class UploadStream extends Transform {
  processedChunks: Promise<string>[] 
  
  constructor(
    private readonly onData: (chunk: Buffer) => Promise<string>, 
    private readonly onEnd: (chunkIds: string[]) => Promise<void>, 
  ) { 
    super()
    this.processedChunks = []
  } 
  
  async _transform(chunk: Buffer, _: string, cb: () => void): Promise<void> { 
    this.processedChunks.push(this.onData(chunk)) 
    cb() 
  } 
  
  async _flush(cb: () => void): Promise<void> {
    await this.onEnd(await Promise.all(this.processedChunks))
    cb()
  }
}
A class for uploading a file chunk by chunk via a stream

MemoryStream acts as a buffer for the incoming download stream. It extends Stream.Transform meaning that it transforms one stream into another one. The two methods _transform and _flush are used to process incoming chunks and to flush the remaining data when the incoming stream has ended.

The class can be initialized with a desired chunk size of the output stream and it has a memory which concatenates the incoming chunks. When a new chunk is handled, the MemoryStream checks if the size of the existing memory plus the size of the new chunk would exceed the desiredChunkSize. If so, memory is send to the output stream using push() and the memory is cleared. In any case the new chunk will be added to memory at the end. When the incoming stream has ended the current memory is pushed as one final chunk to the output stream.

Our second stream class UploadStream is used to actually upload the file to the destination system. It is initialized by providing two callbacks:

  1. What should be done with each chunk. The assumption is that your asynchronous callback will return a string in the end, but you can change this to whatever else you need. In our case a new upload url is requested and a POST request is being made to it containing the chunk data. The callback will return an identifier for the chunk that we will need later.
  2. What should be done when the stream is closed. In our case the transfer is completed by making another POST request containing an ordered list of all chunk identifiers.

Since _transform must be synchronous, processedChunks is an Array of Promise<string>, meaning that we do not yet await the actual chunk uploads to be completed. This is done in _flush, which is automatically executed when the incoming stream has ended. After all the chunk uploads are completed, the second callback is executed.

Here is the final logic for transfering a file using our two new stream classes:

const downloadStream = await axios.get('<fileUrl>', {
  maxContentLength: Infinity, 
  responseType: 'stream', 
}) 

const memoryStream = new MemoryStream(500000) 

const uploadStream = new UploadStream( 
  (chunk: Buffer) => ( 
    new Promise((resolve) => { 
      // upload chunk resolve(chunkId) 
    }) 
  ), async (chunkIds: string[]) => { 
    // complete transfer 
  }) 
)

downloadStream.pipe(memoryStream).pipe(uploadStream)
Putting it all together

All three streams are initialized and a pipeline is set up:

  1. A file is downloaded via Axios as a stream
  2. Each chunk of  this downloadStream is piped into the memoryStream
  3. When memoryStream collected enough data or the downloadStream has ended it will pipe a new chunk of aggregated data into the uploadStream
  4. uploadStream uploads all chunks that it receives and keeps an ordered list of chunk identifiers
  5. When uploadStream received the final chunk it waits for all chunk uploads to be completed before executing logic that ends the file transfer providing all chunk identifiers to the destination system

It was very interesting for me to implement some stream classes in Node.js since I only touched the topic very briefly in the past using premade classes. Our new transfer logic using streams easily outperforms the old one for all fill sizes. Previously we could not handle files that are bigger than 30 MB. Now we can easily transfer files of around 100 MB without increasing the timeout of the Lambda function.

The desiredChunkSize of MemoryStream is something that you can play around with. Setting it too low will cause back pressure in the stream pipeline and might crash your service while setting it too high can negatively impact the execution time. The logic within both callbacks of the UploadStream constructor can be adapted to fulfill many different use-cases.

Next up

Want to stay up to date?