https://medium.com/swlh/processing-large-s3-files-with-aws-lambda-2c5840ae5c91
Processing Large S3 Files With AWS Lambda
Despite having a runtime limit of 15 minutes, AWS Lambda can still be used to process large files. This is useful if you want a…
medium.com
## Accompanying code for - Processing large S3 files with AWS Lambda
## https://medium.com/swlh/processing-large-s3-files-with-aws-lambda-2c5840ae5c91
import csv
import json
import os
import boto3
import botocore.response
MINIMUN_REMAINING_TIME_MS = int(os.getenv('MINIMUM_REMAINING_TIME_MS')) or 10000
def handler(event, context):
bucket_name = event['bucket']
object_key = event['object_key']
offset = event.get('offset', 0)
fieldnames = event.get('fieldnames', None)
s3_resource = boto3.resource('s3')
s3_object = s3_resource.Object(bucket_name=bucket_name, key=object_key)
bodylines = get_object_bodylines(s3_object, offset)
csv_reader = csv.DictReader(bodylines.iter_lines(), fieldnames=fieldnames)
for row in csv_reader:
## process and do work
if context.get_remaining_time_in_millis() < MINIMUN_REMAINING_TIME_MS:
fieldnames = fieldnames or csv_reader.fieldnames
break
new_offset = offset + bodylines.offset
if new_offset < s3_object.content_length:
new_event = {
**event,
"offset": new_offset,
"fieldnames": fieldnames
}
invoke_lambda(context.function_name, new_event)
return
def invoke_lambda(function_name, event):
payload = json.dumps(event).encode('utf-8')
client = boto3.client('lambda')
response = client.invoke(
FunctionName=function_name,
InvocationType='Event',
Payload=payload
)
def get_object_bodylines(s3_object, offset):
resp = s3_object.get(Range=f'bytes={offset}-')
body: botocore.response.StreamingBody = resp['Body']
return BodyLines(body)
class BodyLines:
def __init__(self, body: botocore.response.StreamingBody, initial_offset=0):
self.body = body
self.offset = initial_offset
def iter_lines(self, chunk_size=1024):
"""Return an iterator to yield lines from the raw stream.
This is achieved by reading chunk of bytes (of size chunk_size) at a
time from the raw stream, and then yielding lines from there.
"""
pending = b''
for chunk in self.body.iter_chunks(chunk_size):
lines = (pending + chunk).splitlines(True)
for line in lines[:-1]:
self.offset += len(line)
yield line.decode('utf-8')
pending = lines[-1]
if pending:
self.offset += len(pending)
yield pending.decode('utf-8')
'AWS' 카테고리의 다른 글
AWS ECS (0) | 2020.08.19 |
---|---|
Route 53으로 다중 리전 장애 조치 (0) | 2020.08.18 |
AWS CloudFormation StackSet (0) | 2020.08.18 |
Amazon S3, AWS Toolkits for Visual Studio Code (0) | 2020.08.14 |
AWS EKS 배포 가능한 최대 pods 갯수 (0) | 2020.08.11 |