How to make MSCK REPAIR TABLE execute automaticall

2019-01-25 09:41发布

I have a spark batch job which is executed hourly. Each run generates and stores new data in S3 with the directory naming pattern DATA/YEAR=?/MONTH=?/DATE=?/datafile.

After uploading the data to S3, I want to investigate them using Athena. More, I would like to visualize them in QuickSight by connecting to Athena as a data source.

The problem is that, after each run of my Spark batch, the newly generated data stored in S3 will not be discovered by Athena, unless I manually run the query MSCK REPARI TABLE.

Is there a way to make Athena update the data automatically, so that I can create a fully automatic data visualization pipeline?

2条回答
做个烂人
2楼-- · 2019-01-25 10:01

You should be running ADD PARTITION instead:

aws athena start-query-execution --query-string "ALTER TABLE ADD PARTITION..."

Which adds a the newly created partition from your S3 location Athena leverages Hive for partitioning data. To create a table with partitions, you must define it during the CREATE TABLE statement. Use PARTITIONED BY to define the keys by which to partition data.

查看更多
不美不萌又怎样
3楼-- · 2019-01-25 10:08

There are a number of ways to schedule this task. How do you schedule your workflows? Do you use a system like Airflow, Luigi, Azkaban, cron, or using an AWS Data pipeline?

From any of these, you should be able to fire off the following CLI command.

$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"

Another option would be AWS Lambda. You could have a function that calls MSCK REPAIR TABLE some_database.some_table in response to a new upload to S3.

An example Lambda Function could be written as such:

import boto3

def lambda_handler(event, context):
    bucket_name = 'some_bucket'

    client = boto3.client('athena')

    config = {
        'OutputLocation': 's3://' + bucket_name + '/',
        'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}

    }

    # Query Execution Parameters
    sql = 'MSCK REPAIR TABLE some_database.some_table'
    context = {'Database': 'some_database'}

    client.start_query_execution(QueryString = sql, 
                                 QueryExecutionContext = context,
                                 ResultConfiguration = config)

You would then configure a trigger to execute your Lambda function when new data are added under the DATA/ prefix in your bucket.

Ultimately, explicitly rebuilding the partitions after you run your Spark Job using a job scheduler has the advantage of being self documenting. On the other hand, AWS Lambda is convenient for jobs like this one.

查看更多
登录 后发表回答