How to rsync local directory with google cloud sto

2019-09-10 06:34发布

There is a way to sync files from folder to Google Cloud Storage bucket with gsutil command line tool like:

gsutil rsync -r <src> gs://<bucket>

Is there any way to do the same on development server?

2条回答
手持菜刀,她持情操
2楼-- · 2019-09-10 06:56

I'm replaying to myself as I couldn't find any other solution so I've implemented my own. I'm not sure if that's the proper way of doing that thing but it does what I want. Maybe someone else will find it useful as well.

I've created upload handler for webapp2 that will allow me to upload multipart-encoded files via http POST request.

import cloudstorage as gcs
from webapp2 import uri_for
from google.appengine.ext import blobstore
from google.appengine.ext.webapp import blobstore_handlers


class Upload(blobstore_handlers.BlobstoreUploadHandler):
    def post(self):
        """Copy uploaded files to provided bucket destination"""
        fileinfo = self.get_file_infos()[0]
        destpath = '/{}/{}'.format(
            self.request.get('bucket'),
            self.request.get('dest'))

        gcs.copy2(fileinfo.gs_object_name[3:], destpath)
        gcs.delete(fileinfo.gs_object_name[3:])
        self.response.content_type = 'text/plain'
        self.response.write('File created: {}'.format(destpath))

    def get(self):
        """Returns URL to open upload session

        when `bucket` parameter is provided the blob will be uploaded to
        Google Cloud Storage bucket
        """
        bucket = self.request.get('bucket')
        self.response.content_type = 'text/plain'
        self.response.write(_create_upload_url(bucket))


def _create_upload_url(bucket):
    """Returns open upload session URL"""
    if bucket:
        bucket = '{}/'.format(bucket)

    return blobstore.create_upload_url(
        success_path=uri_for('upload'),
        gs_bucket_name=bucket
    )

Then I've created CLI task that can be used to upload files from console:

#!/usr/bin/env python
from urlparse import urlparse
from xml.dom.minidom import parseString
import argparse
import hashlib
import magic
import os
import requests
import subprocess
import sys
import urllib2


def _sync(file, endpoint):
    """Upload file to given endpoint

    - on success returns: None
    - on failure returns: error message
    """
    r = requests.get(endpoint, params={'destpath': file})
    if r.status_code != 200:
        return "[{}] Can't retrive upload url".format(r.status_code)
    upload_url = r.text
    mime_type = _get_mime_type(file)

    r = requests.post(
        upload_url,
        files={
          'file': ('file.tmp', open(file, 'rb'), mime_type)
        })

    if r.status_code != 200:
        return "[{}] Can't upload file".format(r.status_code)


def _delete(file, endpoint):
    """Delete file from given endpoint

    - on success returns: None
    - on failure returns: error message
    """
    r = requests.delete(
        endpoint,
        params={
          'path': file
        })

    if r.status_code != 200:
        return "[{}] Can't delete file".format(r.status_code)


def _get_mime_type(path):
    """Returns mime type of the file"""
    mime = magic.Magic(mime=True)
    return mime.from_file(path)


def _etag(path):
    """Returns ETag for a given file"""
    hash_md5 = hashlib.md5()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(65536), b""):
            hash_md5.update(chunk)
    f.close()
    return hash_md5.hexdigest()


def _get_bucket_state(url, bucket, result={}, marker=None):
    """Returns current state (file lists) of bucket for GAE local storage"""
    o = urlparse(url)
    gcsurl = "{}://{}/_ah/gcs/{}".format(
            o.scheme, o.netloc, bucket)

    if marker:
        gcsurl += '?marker={}'.format(marker)

    print "Fetching files from bucket: {}".format(gcsurl)

    root = parseString(urllib2.urlopen(gcsurl).read())

    for content in root.getElementsByTagName('Contents'):
        key = content.getElementsByTagName('Key')[0].childNodes[0].data
        size = content.getElementsByTagName('Size')[0].childNodes[0].data
        etag = content.getElementsByTagName('ETag')[0].childNodes[0].data
        lm = content.getElementsByTagName('LastModified')[0].childNodes[0].data
        result['{}/{}'.format(bucket, key)] = {
                'etag': etag,
                'size': size,
                'last-modifed': lm,
                }

    print "found {} files so far...".format(len(result))
    nextMarker = root.getElementsByTagName('NextMarker')
    if nextMarker:
        _get_bucket_state(
                url, bucket, result, nextMarker[0].childNodes[0].data)

    return result


parser = argparse.ArgumentParser(description="""
Synchronize data with local Google Cloud Storage bucket

Usage example:
  % ./sync_local <bucket_dir> http://localhost:8080/upload
""", formatter_class=argparse.RawTextHelpFormatter)

parser.add_argument(
    'bucket',
    help='Source directory, its name will be used as destination bucket name',
    nargs=1
)

parser.add_argument(
    'url',
    help='upload url required for local environemnt',
    nargs=1,
    default='http://localhost:8080/upload'
)

parser.add_argument(
    '--dry-run',
    help="show what will be done but don't send any data",
    action='store_true'
)

args = parser.parse_args()

url = args.url[0]
bucket = args.bucket[0].rstrip('/')
dry_run = args.dry_run

# Start sync
print "Building sync state..."
current_state = _get_bucket_state(url, bucket)
print "Getting local files list..."
ls = subprocess.check_output(
        'find {} -type f'.format(bucket),
        stderr=subprocess.STDOUT,
        shell=True).split("\n")[:-1]

to_update = []
the_same = []

for file in ls:
    if file:
        if file in current_state:
            if current_state[file]['etag'] == _etag(file):
                the_same.append(file)
            else:
                to_update.append(file)
        else:
            to_update.append(file)

to_delete = set(current_state.keys()) - set(to_update) - set(the_same)

print "Files to sync: {}".format(len(ls))
print "Current state: {}".format(len(current_state))
print "Same: {}, To udpate: {}, To delete: {}".format(
        len(the_same), len(to_update), len(to_delete))

if len(to_update) or len(to_delete):
    var = raw_input("Do you want to sync data? [yn]: ")
    if var.strip() != 'y':
        sys.exit()
else:
    print "Already up-to-date"

for file in to_update:
    if dry_run:
        print 'WILL UPDATE: {}'.format(file)
        continue
    else:
        result = _sync(file, url)
        if result:
            print 'ERROR: {} {}'.format(result, file)
        else:
            print 'UPDATED: {}'.format(file)

for file in to_delete:
    if dry_run:
        print 'WILL DELETE: {}'.format(file)
        continue
    else:
        result = _delete(file, url)
        if result:
            print 'ERROR: {} {}'.format(result, file)
        else:
            print 'DELETED: {}'.format(file)

When dev server is running I can simple upload files from specified disk location to desire local storage bucket with preserving source file names:

./sync_local <dir> http://localhost:8080/upload

where <dir> has the same name as bucket to witch you want to send files.

When files are uploaded you can list bucket by going to:

http://localhost:8080/_ah/gcs/<bucket_name>
查看更多
Root(大扎)
3楼-- · 2019-09-10 07:03

Since Development Server emulates Cloud Storage via local directory specified by --storage_path= when running dev_appserver.py parameter, you can do run the rsync using regular linux tools, i.e. something like this (if you sync files between different servers):

rsync -a ~/dir1 username@remote_host:destination_directory

Or, if you're syncing files on the local system, use something like this:

rsync -r dir1/ dir2 
查看更多
登录 后发表回答