Descomprimir y Gzip archivos S3 entrantes con AWS Lambda
Gzip y Descomprimir – Más fácil, más rápido y mejor…
Hace un tiempo me encontré con un escenario en el que los archivos S3 entrantes estaban comprimidos. Cada archivo comprimido contenía cinco archivos de texto o CSV. Sin embargo, para su posterior procesamiento, necesitaba extraer el contenido comprimido y convertirlo al formato comprimido con Gzip. Dado que hubo una gran afluencia de archivos, no parecía posible descomprimir y gzipear manualmente.
Automatización
La mejor manera de automatizar el proceso parecía utilizar las funciones de AWS Lambda. Si se dirige a la pestaña Propiedades de su depósito S3, puede configurar una Notificación de evento para todos los eventos de “creación” de objetos (o solo eventos PutObject). Como destino, puede seleccionar la función Lambda donde escribirá su código para descomprimir y gzip archivos.
Ahora, cada vez que se agregue un nuevo archivo .zip a su depósito S3, se activará la función lambda. También puede agregar un prefijo a la configuración de notificación de eventos, por ejemplo, si solo desea ejecutar la función lambda cuando los archivos se cargan en una carpeta específica dentro del depósito S3.
La función lambda puede verse así:
import logging import zipfile from io import BytesIO from boto3 import resource import gzip import io logger = logging.getLogger() logger.setLevel(logging.INFO) def unzip_and_gzip_files(filekey, sourcebucketname, destinationbucket): try: zipped_file = s3_resource.Object(bucket_name=sourcebucketname, key=filekey) buffer = BytesIO(zipped_file.get()["Body"].read()) zipped = zipfile.ZipFile(buffer) for file in zipped.namelist(): logger.info(f'current file in zipfile: {file}') final_file_path = file + '.gzip' with zipped.open(file, "r") as f_in: gzipped_content = gzip.compress(f_in.read()) destinationbucket.upload_fileobj(io.BytesIO(gzipped_content), final_file_path, ExtraArgs={"ContentType": "text/plain"} ) except Exception as e: logger.info(f'Error: Unable to gzip & upload file: {e}') def lambda_handler(event, context): global s3_resource s3_resource = resource('s3') sourcebucketname = 'incoming-bucket' destination_bucket = s3_resource.Bucket('final-bucket') key = event['Records'][0]['s3']['object']['key'] unzip_and_gzip_files(key, sourcebucketname, destination_bucket) return { 'statusCode': 200 }
Cuando un evento activa esta función lambda, la función extraerá la clave de archivo que provocó la activación. Usando la clave de archivo, cargaremos el archivo zip entrante en un búfer, lo descomprimiremos y leeremos cada archivo individualmente.
Dentro del ciclo, cada archivo individual dentro de la carpeta comprimida se comprimirá por separado en un archivo de formato gzip y luego se cargará en el depósito S3 de destino.
Puede actualizar el parámetro final_file_path
si desea cargar los archivos en una carpeta específica. De manera similar, puede actualizar parámetros como sourcebucketname
y destination_bucket
según sus requisitos. También puede cargar los archivos comprimidos con gzip en el mismo depósito de origen.
Otra forma de hacer lo mismo podría ser leer primero el archivo S3 en la carpeta /tmp y luego descomprimirlo para su posterior procesamiento. Sin embargo, este método puede bloquearse si comienza a recibir varios archivos en su depósito S3 al mismo tiempo.
Configuración lambda
Tenga en cuenta que, de manera predeterminada, Lambda tiene un tiempo de espera de tres segundos y una memoria de solo 128 MB. Si tiene varios archivos en su depósito S3, debe cambiar estos parámetros a sus valores máximos:
- Tiempo de espera = 900
- Tamaño_memoria = 10240
Permisos de AWS
El rol de AWS que está utilizando para ejecutar su función Lambda requerirá ciertos permisos. En primer lugar, requeriría acceso a S3 para leer y escribir archivos. Las siguientes políticas son las principales:
“s3: ListBucket”
“s3:ObjetoCabeza”
“s3:ObtenerObjeto”
“s3:ObtenerVersiónObjeto”
“s3:PonerObjeto”
También debe tener acceso a CloudWatch para poder iniciar sesión y depurar su código si es necesario.
“registros: Crear grupo de registros”
“registros: Crear flujo de registro”
“registros:PutLogEvents”
La replicación de un flujo de trabajo similar a través de Terraform también es bastante sencilla.