AWS

Processing Large S3 Files With AWS Lambda

Kyle79 2020. 8. 18. 14:00

 

 

https://medium.com/swlh/processing-large-s3-files-with-aws-lambda-2c5840ae5c91

 

Processing Large S3 Files With AWS Lambda

Despite having a runtime limit of 15 minutes, AWS Lambda can still be used to process large files. This is useful if you want a…

medium.com

 

 

## Accompanying code for - Processing large S3 files with AWS Lambda
## https://medium.com/swlh/processing-large-s3-files-with-aws-lambda-2c5840ae5c91

import csv
import json
import os

import boto3
import botocore.response

MINIMUN_REMAINING_TIME_MS = int(os.getenv('MINIMUM_REMAINING_TIME_MS')) or 10000


def handler(event, context):
    bucket_name = event['bucket']
    object_key = event['object_key']
    offset = event.get('offset', 0)
    fieldnames = event.get('fieldnames', None)

    s3_resource = boto3.resource('s3')
    s3_object = s3_resource.Object(bucket_name=bucket_name, key=object_key)
    bodylines = get_object_bodylines(s3_object, offset)

    csv_reader = csv.DictReader(bodylines.iter_lines(), fieldnames=fieldnames)
    for row in csv_reader:
        ## process and do work
        if context.get_remaining_time_in_millis() < MINIMUN_REMAINING_TIME_MS:
            fieldnames = fieldnames or csv_reader.fieldnames
            break
    
    new_offset = offset + bodylines.offset
    if new_offset < s3_object.content_length:
        new_event = {
            **event,
            "offset": new_offset,
            "fieldnames": fieldnames
        }
        invoke_lambda(context.function_name, new_event)
    return


def invoke_lambda(function_name, event):
    payload = json.dumps(event).encode('utf-8')
    client = boto3.client('lambda')
    response = client.invoke(
        FunctionName=function_name,
        InvocationType='Event',
        Payload=payload
    )


def get_object_bodylines(s3_object, offset):
    resp = s3_object.get(Range=f'bytes={offset}-')
    body: botocore.response.StreamingBody = resp['Body']
    return BodyLines(body)


class BodyLines:
    def __init__(self, body: botocore.response.StreamingBody, initial_offset=0):
        self.body = body
        self.offset = initial_offset

    def iter_lines(self, chunk_size=1024):
        """Return an iterator to yield lines from the raw stream.
        This is achieved by reading chunk of bytes (of size chunk_size) at a
        time from the raw stream, and then yielding lines from there.
        """
        pending = b''
        for chunk in self.body.iter_chunks(chunk_size):
            lines = (pending + chunk).splitlines(True)
            for line in lines[:-1]:
                self.offset += len(line)
                yield line.decode('utf-8')
            pending = lines[-1]
        if pending:
            self.offset += len(pending)
            yield pending.decode('utf-8')

'AWS' 카테고리의 다른 글

AWS ECS  (0) 2020.08.19
Route 53으로 다중 리전 장애 조치  (0) 2020.08.18
AWS CloudFormation StackSet  (0) 2020.08.18
Amazon S3, AWS Toolkits for Visual Studio Code  (0) 2020.08.14
AWS EKS 배포 가능한 최대 pods 갯수  (0) 2020.08.11