-
Notifications
You must be signed in to change notification settings - Fork 0
/
uploader_gcs.py
92 lines (73 loc) · 3.46 KB
/
uploader_gcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# import sys
import os
import time
import argparse
import google.auth
import urllib.request as rqlib
from google.cloud import storage
from google.oauth2 import service_account
def upload_img_gcs(GCP_PROJECT, GCP_BUCKET):
"""
The library can auto-detect the credentials of Google Cloud Client to use.
Only if:
* Before running this sample, run in terminal `gcloud auth application-default login`
as described in https://cloud.google.com/docs/authentication/external/set-up-adc
* To ensure, the credentials (default) will available on:\n
`Linux, macOS: $HOME/.config/gcloud/application_default_credentials.json`;
`Windows: %APPDATA%/gcloud/application_default_credentials.json`
https://cloud.google.com/docs/authentication/application-default-credentials#personal
* For the credentials file (.json) need to copy inside this file.
"""
for fcd in os.listdir():
if fcd.endswith('.json'): # U must only give one json file inside
GCP_CREDS = fcd
CREDS_AUTH = service_account.Credentials.from_service_account_file(GCP_CREDS)
scoped_credentials = CREDS_AUTH.with_scopes(
['https://www.googleapis.com/auth/cloud-platform'])
# credentials, project_id = google.auth.default()
# Setting Credentials using SERVICE ACCOUNT CREDENTIALS if use ADC just remove the 'credentials param'
storage_client = storage.Client(project=GCP_PROJECT, credentials=CREDS_AUTH)
buckets_list = storage_client.list_buckets() # list all bucket
bck_ls = [bc.name for bc in buckets_list if (bc.name in GCP_BUCKET)] # get all bucket name
if not bool(bck_ls):
bucket = storage_client.create_bucket(GCP_BUCKET) # make new bucket
bucket = storage_client.get_bucket(GCP_BUCKET) # set to the current bucket
return bucket
def args_p():
"""Argument to pass into terminal/script"""
parser = argparse.ArgumentParser(description='> File Uploader to Google Cloud Storage <')
parser.add_argument('project_id',type=str,
metavar='PROJECT_GCP',
help='Type in your GCP project id (existed)')
parser.add_argument('bucket_name',type=str,
metavar='BUCKET_GCP',
help='Type your GCP bucket name (existed or create new)')
parser.add_argument('-i', '--file_url',type=str,
metavar='FILE_URL',
help='Type your online file URL',
default='https://images.unsplash.com/photo-1553550319-d8d5393e1c80?ixlib=rb-4.0.3&ixid=MnwxMjA3fDB8MHxzZWFyY2h8Mnx8c25vdyUyMGhvdXNlfGVufDB8fDB8fA%3D%3D&w=1000&q=80.jpg')
# "https://wordpress.org/plugins/about/readme.txt" ~ to test for non-image
t = time.localtime()
current_time = str(time.strftime("%y%m%d%H%M%S", t)) # now time yymmddHHMMSS
parser.add_argument('-n', '--file_name', type=str,
metavar='afile_NAME',
help='Type your online file name for uploaded to the GCS',
default=f'snow_house_{current_time}.jpg')
args = parser.parse_args()
return args
def get_img_online(URL_file):
"""Retrieved and opened file from the internet by URL"""
file = rqlib.urlopen(URL_file)
return file
if __name__ == '__main__':
try:
prgs = args_p()
online_file = get_img_online(prgs.file_url)
buckt = upload_img_gcs(prgs.project_id, prgs.bucket_name)
blob = buckt.blob(prgs.file_name)
blob.upload_from_string(online_file.read(), content_type='text/plain')
print('File Uploaded v')
except:
print('Failed x')
finally:
print('~ Thank You for using this Script ~')