Je travaille dans l'écosystème AWS EMR.

Je recherche un moyen intelligent de repartitionner la sortie aws firehose:

S3: // compartiment / AAAA / MM / JJ / HH

Au format de partition de la ruche

S3: // compartiment / dt = AA-MM-JJ-HH

Aucune suggestion?

Merci, Omid

3
Omid Vahdaty 26 avril 2017 à 17:20

3 réponses

Meilleure réponse

Ajout de la même réponse dans Boto3 (pour correspondre à l'empaquetage par défaut lambda actuel)

import re
import boto3

##set buckets:
source_bucket='walla-anagog-us-east-1'
destination_bucket='walla-anagog-eu-west-1'

## regex from from YYYY/MM/DD/HH to dt=YYYY-MM-DD   
##replaced_file = re.sub(r'(\d{4})\/(\d{2})\/(\d{2})\/(\d{2})', r'dt=\1-\2-\3' , file)

client = boto3.client('s3')
s3 = boto3.resource('s3')
mybucket = s3.Bucket(source_bucket)

for object in mybucket.objects.all():
    replaced_key = re.sub(r'(\d{4})\/(\d{2})\/(\d{2})\/(\d{2})', r'dt=\1-\2-\3' , object.key)
    print(object.key)
    client.copy_object(Bucket=destination_bucket, CopySource=source_bucket+"/"+object.key, Key=replaced_key, ServerSideEncryption='AES256')
    client.delete_object(Bucket=source_bucket, Key=object.key)
0
Omid Vahdaty 30 avril 2017 à 11:47

Nous avons résolu ce problème en utilisant S3DistCp. Nous effectuons des agrégations horaires des données, des regroupements selon un modèle et des sorties vers des répertoires correctement préfixés.

C'est certainement une fonctionnalité qui manque à Firehose, et il n'existe actuellement aucun moyen de le faire en utilisant uniquement Firehose.

http://docs.aws.amazon.com/emr/latest/ReleaseGuide/UsingEMR_s3distcp.html

1
John Groves 26 avril 2017 à 21:25

J'ai utilisé python et boto pour déplacer les fichiers et les repartitionner. J'ai appliqué une expression régulière pour renommer la clé de YYYY / MM / DD / HH To dt = YY-MM-DD-HH

Extrait de code (notez que la clé src est supprimée):

from boto.s3.connection import S3Connection
import re

conn = S3Connection('xxx','yyy')

##get buckets:
source_bucket='srcBucketName'
destination_bucket='dstBucketName'

src = conn.get_bucket(source_bucket)
dst = conn.get_bucket(destination_bucket)

##Iterate
for key in src.list():
     #print key.name.encode('utf-8')
     file = key.name.encode('utf-8')    

     replaced_file = re.sub(r'(\d{4})\/(\d{2})\/(\d{2})\/(\d{2})', r'dt=\1-\2-\3-\4' , file)
     #print replaced_file

     #actual copy    
     dst.copy_key(replaced_file,src.name,file,encrypt_key=True )
     key.delete()
0
Omid Vahdaty 27 avril 2017 à 09:56