Request Kocahava to get file names and then copy data from files to Redshift using Python [on hold]
I am working on a requirement where I need to copy some csv reports data to Redshift.
These csv reports are in S3 bucket and to fetch these reports I need to know first the file name. Thus I am making first request to Kochava API to get the access tokens of files and then in second request I am making request with access tokens to get the file names. Then in the last step I am copying the file's data to Redshift.
For above requirement I have written following code in Python 3.6-
import configparser
import datetime
import logging
import os
import sys
import time
import psycopg2
import requests
# defining the api-endpoint
API_ENDPOINT = "https://reporting.api.kochava.com/v1.4/detail"
progress_url = "https://reporting.api.kochava.com/v1.4/progress"
FORMAT = '%(asctime)s [%(levelname)s] %(filename)s:%(lineno)s %(funcName)s()
: %(message)s'
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(logging.Formatter(FORMAT, DATETIME_FORMAT))
logger.addHandler(ch)
def get_conn():
try:
conn = psycopg2.connect(" dbname= " + config.get("DATABASE", "DBNAME")
+ " user=" + config.get("DATABASE", "USER")
+ " password=" + config.get("DATABASE", "PASSWORD")
+ " port=" + config.get("DATABASE", "PORT")
+ " host=" + config.get("DATABASE", "HOST"))
conn.set_session(autocommit=True)
return conn,conn.cursor()
except Exception as ex:
logger.error(ex)
def get_detail_report(start_time, end_time, appslist, events):
# getting database connection
conn,cursor = get_conn()
apps_dict = {}
for app in appslist:
logger.info("Working for app: {}".format(app))
token_dict = {}
for event in events:
params = {'api_key': 'MY_API_KEY',
'app_guid': '{}'.format(app),
'time_start': '{}'.format(start_time),
'time_end': '{}'.format(end_time),
'traffic': ['{}'.format(event)],
'traffic_filtering': {
},
'time_zone': 'Asia/Singapore',
'delivery_method': [
'S3bucket',
config.get("S3_DETAILS", "REGION"),
config.get("S3_DETAILS", "BUCKET_NAME"),
config.get("S3_DETAILS", "ACCESS_KEY"),
config.get("S3_DETAILS", "SECRET_KEY"),
],
'delivery_format': 'csv',
'notify': ['admin@example.com']
}
try:
r = requests.post(url=API_ENDPOINT, json=params)
token_dict[event] = r.json()["report_token"]
except Exception:
logger.error("Error occured for App:{0}, Event:{1}".format(app, event))
apps_dict[app] = token_dict
time.sleep(60)
for app in appslist:
app_tokens = apps_dict[app]
for event, token in app_tokens.items():
filename = get_file_name(app, token, event)
if filename[0:5] == "Error":
logger.error(filename)
else:
# print(filename)
tablename = "cmp_uat.Kochava_" + event + "_data" # creating tablename where the data is to be saved
# print("TableName: ", tablename)
query_string = """COPY {0} from 's3://{1}/{2}'
CREDENTIALS
'aws_access_key_id={3};aws_secret_access_key={4}'
delimiter ',' IGNOREHEADER 1 FILLRECORD ACCEPTINVCHARS CSV; """.format(tablename,
config.get("S3_DETAILS",
"BUCKET_NAME"),
filename,
config.get("S3_DETAILS",
"ACCESS_KEY"),
config.get("S3_DETAILS",
"SECRET_KEY"))
# logger.info(query_string)
try:
logger.info(
"Inserting Data for App: {0} with Token: {1} for Event: {2} in Table: {3}".format(app, token,
event,
tablename))
cursor.execute(query_string)
logger.info(
"Data inserted for app: {0} with token: {1} for event: {2} in table: {3}".format(app, token,
event,
tablename))
except Exception as ex:
logger.error(ex)
def get_file_name(app, token, event):
progress_url = "https://reporting.api.kochava.com/v1.4/progress"
progress_body = {
"api_key": "2088542A-067B-4B76-B9DB-E80B4321C4AA",
"app_guid": "{}".format(app),
"token": token
}
r = requests.post(url=progress_url, json=progress_body)
if r.json()["status"] != "completed" and r.json()["status"].lower() !=
"error":
logger.info(
"Report for App: {0} on Event: {1} with Token: {2} is in status {3} nTrying again in 30 sec.".format(app,
event,
token,
r.json()[
"status"].upper()))
time.sleep(30)
return get_file_name(app, token, event)
elif r.json()["status"].lower() == "error":
return "Error for App:{0} on Event {1} with Token {2} resulted in
error with repsonse text {3}".format(app,
event,
token,
response)
else:
filen = r.json()["report"].split("?")[0].split("//")[1].split("/")[1]
logger.info(
"Filename found for App: {0} on Event: {1} with Token {2}n
Filename: {3}".format(app, event, token, filen))
return filen
if __name__ == "__main__":
config = configparser.ConfigParser()
config.read(os.path.join(os.getcwd(), 'config.ini'))
get_detail_report(sys.argv[1], sys.argv[2], ["APP LIST"],
["click","impression","event","install",
"collected cost"])
One issue I have found in above code is that if report is in queued or in running state, other events wait for this to be completed and so I thought to make this multiprocess.
I tried to add multiprocessing's pool in code but unable to do it correctly.
I request can you please add multiprocessing in my code? Since here API details are not mentioned, I will run the code and then share if multiprocessing is impacting any affect or not.
If there is any other improvement let me know that also. I will be obliged to you.
Thanks!
python-3.x multiprocessing
put on hold as off-topic by πάντα ῥεῖ, Sᴀᴍ Onᴇᴌᴀ, 200_success, Jamal♦ 2 days ago
This question appears to be off-topic. The users who voted to close gave this specific reason:
- "Code not implemented or not working as intended: Code Review is a community where programmers peer-review your working code to address issues such as security, maintainability, performance, and scalability. We require that the code be working correctly, to the best of the author's knowledge, before proceeding with a review." – πάντα ῥεῖ, Sᴀᴍ Onᴇᴌᴀ, 200_success, Jamal
If this question can be reworded to fit the rules in the help center, please edit the question.
add a comment |
I am working on a requirement where I need to copy some csv reports data to Redshift.
These csv reports are in S3 bucket and to fetch these reports I need to know first the file name. Thus I am making first request to Kochava API to get the access tokens of files and then in second request I am making request with access tokens to get the file names. Then in the last step I am copying the file's data to Redshift.
For above requirement I have written following code in Python 3.6-
import configparser
import datetime
import logging
import os
import sys
import time
import psycopg2
import requests
# defining the api-endpoint
API_ENDPOINT = "https://reporting.api.kochava.com/v1.4/detail"
progress_url = "https://reporting.api.kochava.com/v1.4/progress"
FORMAT = '%(asctime)s [%(levelname)s] %(filename)s:%(lineno)s %(funcName)s()
: %(message)s'
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(logging.Formatter(FORMAT, DATETIME_FORMAT))
logger.addHandler(ch)
def get_conn():
try:
conn = psycopg2.connect(" dbname= " + config.get("DATABASE", "DBNAME")
+ " user=" + config.get("DATABASE", "USER")
+ " password=" + config.get("DATABASE", "PASSWORD")
+ " port=" + config.get("DATABASE", "PORT")
+ " host=" + config.get("DATABASE", "HOST"))
conn.set_session(autocommit=True)
return conn,conn.cursor()
except Exception as ex:
logger.error(ex)
def get_detail_report(start_time, end_time, appslist, events):
# getting database connection
conn,cursor = get_conn()
apps_dict = {}
for app in appslist:
logger.info("Working for app: {}".format(app))
token_dict = {}
for event in events:
params = {'api_key': 'MY_API_KEY',
'app_guid': '{}'.format(app),
'time_start': '{}'.format(start_time),
'time_end': '{}'.format(end_time),
'traffic': ['{}'.format(event)],
'traffic_filtering': {
},
'time_zone': 'Asia/Singapore',
'delivery_method': [
'S3bucket',
config.get("S3_DETAILS", "REGION"),
config.get("S3_DETAILS", "BUCKET_NAME"),
config.get("S3_DETAILS", "ACCESS_KEY"),
config.get("S3_DETAILS", "SECRET_KEY"),
],
'delivery_format': 'csv',
'notify': ['admin@example.com']
}
try:
r = requests.post(url=API_ENDPOINT, json=params)
token_dict[event] = r.json()["report_token"]
except Exception:
logger.error("Error occured for App:{0}, Event:{1}".format(app, event))
apps_dict[app] = token_dict
time.sleep(60)
for app in appslist:
app_tokens = apps_dict[app]
for event, token in app_tokens.items():
filename = get_file_name(app, token, event)
if filename[0:5] == "Error":
logger.error(filename)
else:
# print(filename)
tablename = "cmp_uat.Kochava_" + event + "_data" # creating tablename where the data is to be saved
# print("TableName: ", tablename)
query_string = """COPY {0} from 's3://{1}/{2}'
CREDENTIALS
'aws_access_key_id={3};aws_secret_access_key={4}'
delimiter ',' IGNOREHEADER 1 FILLRECORD ACCEPTINVCHARS CSV; """.format(tablename,
config.get("S3_DETAILS",
"BUCKET_NAME"),
filename,
config.get("S3_DETAILS",
"ACCESS_KEY"),
config.get("S3_DETAILS",
"SECRET_KEY"))
# logger.info(query_string)
try:
logger.info(
"Inserting Data for App: {0} with Token: {1} for Event: {2} in Table: {3}".format(app, token,
event,
tablename))
cursor.execute(query_string)
logger.info(
"Data inserted for app: {0} with token: {1} for event: {2} in table: {3}".format(app, token,
event,
tablename))
except Exception as ex:
logger.error(ex)
def get_file_name(app, token, event):
progress_url = "https://reporting.api.kochava.com/v1.4/progress"
progress_body = {
"api_key": "2088542A-067B-4B76-B9DB-E80B4321C4AA",
"app_guid": "{}".format(app),
"token": token
}
r = requests.post(url=progress_url, json=progress_body)
if r.json()["status"] != "completed" and r.json()["status"].lower() !=
"error":
logger.info(
"Report for App: {0} on Event: {1} with Token: {2} is in status {3} nTrying again in 30 sec.".format(app,
event,
token,
r.json()[
"status"].upper()))
time.sleep(30)
return get_file_name(app, token, event)
elif r.json()["status"].lower() == "error":
return "Error for App:{0} on Event {1} with Token {2} resulted in
error with repsonse text {3}".format(app,
event,
token,
response)
else:
filen = r.json()["report"].split("?")[0].split("//")[1].split("/")[1]
logger.info(
"Filename found for App: {0} on Event: {1} with Token {2}n
Filename: {3}".format(app, event, token, filen))
return filen
if __name__ == "__main__":
config = configparser.ConfigParser()
config.read(os.path.join(os.getcwd(), 'config.ini'))
get_detail_report(sys.argv[1], sys.argv[2], ["APP LIST"],
["click","impression","event","install",
"collected cost"])
One issue I have found in above code is that if report is in queued or in running state, other events wait for this to be completed and so I thought to make this multiprocess.
I tried to add multiprocessing's pool in code but unable to do it correctly.
I request can you please add multiprocessing in my code? Since here API details are not mentioned, I will run the code and then share if multiprocessing is impacting any affect or not.
If there is any other improvement let me know that also. I will be obliged to you.
Thanks!
python-3.x multiprocessing
put on hold as off-topic by πάντα ῥεῖ, Sᴀᴍ Onᴇᴌᴀ, 200_success, Jamal♦ 2 days ago
This question appears to be off-topic. The users who voted to close gave this specific reason:
- "Code not implemented or not working as intended: Code Review is a community where programmers peer-review your working code to address issues such as security, maintainability, performance, and scalability. We require that the code be working correctly, to the best of the author's knowledge, before proceeding with a review." – πάντα ῥεῖ, Sᴀᴍ Onᴇᴌᴀ, 200_success, Jamal
If this question can be reworded to fit the rules in the help center, please edit the question.
add a comment |
I am working on a requirement where I need to copy some csv reports data to Redshift.
These csv reports are in S3 bucket and to fetch these reports I need to know first the file name. Thus I am making first request to Kochava API to get the access tokens of files and then in second request I am making request with access tokens to get the file names. Then in the last step I am copying the file's data to Redshift.
For above requirement I have written following code in Python 3.6-
import configparser
import datetime
import logging
import os
import sys
import time
import psycopg2
import requests
# defining the api-endpoint
API_ENDPOINT = "https://reporting.api.kochava.com/v1.4/detail"
progress_url = "https://reporting.api.kochava.com/v1.4/progress"
FORMAT = '%(asctime)s [%(levelname)s] %(filename)s:%(lineno)s %(funcName)s()
: %(message)s'
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(logging.Formatter(FORMAT, DATETIME_FORMAT))
logger.addHandler(ch)
def get_conn():
try:
conn = psycopg2.connect(" dbname= " + config.get("DATABASE", "DBNAME")
+ " user=" + config.get("DATABASE", "USER")
+ " password=" + config.get("DATABASE", "PASSWORD")
+ " port=" + config.get("DATABASE", "PORT")
+ " host=" + config.get("DATABASE", "HOST"))
conn.set_session(autocommit=True)
return conn,conn.cursor()
except Exception as ex:
logger.error(ex)
def get_detail_report(start_time, end_time, appslist, events):
# getting database connection
conn,cursor = get_conn()
apps_dict = {}
for app in appslist:
logger.info("Working for app: {}".format(app))
token_dict = {}
for event in events:
params = {'api_key': 'MY_API_KEY',
'app_guid': '{}'.format(app),
'time_start': '{}'.format(start_time),
'time_end': '{}'.format(end_time),
'traffic': ['{}'.format(event)],
'traffic_filtering': {
},
'time_zone': 'Asia/Singapore',
'delivery_method': [
'S3bucket',
config.get("S3_DETAILS", "REGION"),
config.get("S3_DETAILS", "BUCKET_NAME"),
config.get("S3_DETAILS", "ACCESS_KEY"),
config.get("S3_DETAILS", "SECRET_KEY"),
],
'delivery_format': 'csv',
'notify': ['admin@example.com']
}
try:
r = requests.post(url=API_ENDPOINT, json=params)
token_dict[event] = r.json()["report_token"]
except Exception:
logger.error("Error occured for App:{0}, Event:{1}".format(app, event))
apps_dict[app] = token_dict
time.sleep(60)
for app in appslist:
app_tokens = apps_dict[app]
for event, token in app_tokens.items():
filename = get_file_name(app, token, event)
if filename[0:5] == "Error":
logger.error(filename)
else:
# print(filename)
tablename = "cmp_uat.Kochava_" + event + "_data" # creating tablename where the data is to be saved
# print("TableName: ", tablename)
query_string = """COPY {0} from 's3://{1}/{2}'
CREDENTIALS
'aws_access_key_id={3};aws_secret_access_key={4}'
delimiter ',' IGNOREHEADER 1 FILLRECORD ACCEPTINVCHARS CSV; """.format(tablename,
config.get("S3_DETAILS",
"BUCKET_NAME"),
filename,
config.get("S3_DETAILS",
"ACCESS_KEY"),
config.get("S3_DETAILS",
"SECRET_KEY"))
# logger.info(query_string)
try:
logger.info(
"Inserting Data for App: {0} with Token: {1} for Event: {2} in Table: {3}".format(app, token,
event,
tablename))
cursor.execute(query_string)
logger.info(
"Data inserted for app: {0} with token: {1} for event: {2} in table: {3}".format(app, token,
event,
tablename))
except Exception as ex:
logger.error(ex)
def get_file_name(app, token, event):
progress_url = "https://reporting.api.kochava.com/v1.4/progress"
progress_body = {
"api_key": "2088542A-067B-4B76-B9DB-E80B4321C4AA",
"app_guid": "{}".format(app),
"token": token
}
r = requests.post(url=progress_url, json=progress_body)
if r.json()["status"] != "completed" and r.json()["status"].lower() !=
"error":
logger.info(
"Report for App: {0} on Event: {1} with Token: {2} is in status {3} nTrying again in 30 sec.".format(app,
event,
token,
r.json()[
"status"].upper()))
time.sleep(30)
return get_file_name(app, token, event)
elif r.json()["status"].lower() == "error":
return "Error for App:{0} on Event {1} with Token {2} resulted in
error with repsonse text {3}".format(app,
event,
token,
response)
else:
filen = r.json()["report"].split("?")[0].split("//")[1].split("/")[1]
logger.info(
"Filename found for App: {0} on Event: {1} with Token {2}n
Filename: {3}".format(app, event, token, filen))
return filen
if __name__ == "__main__":
config = configparser.ConfigParser()
config.read(os.path.join(os.getcwd(), 'config.ini'))
get_detail_report(sys.argv[1], sys.argv[2], ["APP LIST"],
["click","impression","event","install",
"collected cost"])
One issue I have found in above code is that if report is in queued or in running state, other events wait for this to be completed and so I thought to make this multiprocess.
I tried to add multiprocessing's pool in code but unable to do it correctly.
I request can you please add multiprocessing in my code? Since here API details are not mentioned, I will run the code and then share if multiprocessing is impacting any affect or not.
If there is any other improvement let me know that also. I will be obliged to you.
Thanks!
python-3.x multiprocessing
I am working on a requirement where I need to copy some csv reports data to Redshift.
These csv reports are in S3 bucket and to fetch these reports I need to know first the file name. Thus I am making first request to Kochava API to get the access tokens of files and then in second request I am making request with access tokens to get the file names. Then in the last step I am copying the file's data to Redshift.
For above requirement I have written following code in Python 3.6-
import configparser
import datetime
import logging
import os
import sys
import time
import psycopg2
import requests
# defining the api-endpoint
API_ENDPOINT = "https://reporting.api.kochava.com/v1.4/detail"
progress_url = "https://reporting.api.kochava.com/v1.4/progress"
FORMAT = '%(asctime)s [%(levelname)s] %(filename)s:%(lineno)s %(funcName)s()
: %(message)s'
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(logging.Formatter(FORMAT, DATETIME_FORMAT))
logger.addHandler(ch)
def get_conn():
try:
conn = psycopg2.connect(" dbname= " + config.get("DATABASE", "DBNAME")
+ " user=" + config.get("DATABASE", "USER")
+ " password=" + config.get("DATABASE", "PASSWORD")
+ " port=" + config.get("DATABASE", "PORT")
+ " host=" + config.get("DATABASE", "HOST"))
conn.set_session(autocommit=True)
return conn,conn.cursor()
except Exception as ex:
logger.error(ex)
def get_detail_report(start_time, end_time, appslist, events):
# getting database connection
conn,cursor = get_conn()
apps_dict = {}
for app in appslist:
logger.info("Working for app: {}".format(app))
token_dict = {}
for event in events:
params = {'api_key': 'MY_API_KEY',
'app_guid': '{}'.format(app),
'time_start': '{}'.format(start_time),
'time_end': '{}'.format(end_time),
'traffic': ['{}'.format(event)],
'traffic_filtering': {
},
'time_zone': 'Asia/Singapore',
'delivery_method': [
'S3bucket',
config.get("S3_DETAILS", "REGION"),
config.get("S3_DETAILS", "BUCKET_NAME"),
config.get("S3_DETAILS", "ACCESS_KEY"),
config.get("S3_DETAILS", "SECRET_KEY"),
],
'delivery_format': 'csv',
'notify': ['admin@example.com']
}
try:
r = requests.post(url=API_ENDPOINT, json=params)
token_dict[event] = r.json()["report_token"]
except Exception:
logger.error("Error occured for App:{0}, Event:{1}".format(app, event))
apps_dict[app] = token_dict
time.sleep(60)
for app in appslist:
app_tokens = apps_dict[app]
for event, token in app_tokens.items():
filename = get_file_name(app, token, event)
if filename[0:5] == "Error":
logger.error(filename)
else:
# print(filename)
tablename = "cmp_uat.Kochava_" + event + "_data" # creating tablename where the data is to be saved
# print("TableName: ", tablename)
query_string = """COPY {0} from 's3://{1}/{2}'
CREDENTIALS
'aws_access_key_id={3};aws_secret_access_key={4}'
delimiter ',' IGNOREHEADER 1 FILLRECORD ACCEPTINVCHARS CSV; """.format(tablename,
config.get("S3_DETAILS",
"BUCKET_NAME"),
filename,
config.get("S3_DETAILS",
"ACCESS_KEY"),
config.get("S3_DETAILS",
"SECRET_KEY"))
# logger.info(query_string)
try:
logger.info(
"Inserting Data for App: {0} with Token: {1} for Event: {2} in Table: {3}".format(app, token,
event,
tablename))
cursor.execute(query_string)
logger.info(
"Data inserted for app: {0} with token: {1} for event: {2} in table: {3}".format(app, token,
event,
tablename))
except Exception as ex:
logger.error(ex)
def get_file_name(app, token, event):
progress_url = "https://reporting.api.kochava.com/v1.4/progress"
progress_body = {
"api_key": "2088542A-067B-4B76-B9DB-E80B4321C4AA",
"app_guid": "{}".format(app),
"token": token
}
r = requests.post(url=progress_url, json=progress_body)
if r.json()["status"] != "completed" and r.json()["status"].lower() !=
"error":
logger.info(
"Report for App: {0} on Event: {1} with Token: {2} is in status {3} nTrying again in 30 sec.".format(app,
event,
token,
r.json()[
"status"].upper()))
time.sleep(30)
return get_file_name(app, token, event)
elif r.json()["status"].lower() == "error":
return "Error for App:{0} on Event {1} with Token {2} resulted in
error with repsonse text {3}".format(app,
event,
token,
response)
else:
filen = r.json()["report"].split("?")[0].split("//")[1].split("/")[1]
logger.info(
"Filename found for App: {0} on Event: {1} with Token {2}n
Filename: {3}".format(app, event, token, filen))
return filen
if __name__ == "__main__":
config = configparser.ConfigParser()
config.read(os.path.join(os.getcwd(), 'config.ini'))
get_detail_report(sys.argv[1], sys.argv[2], ["APP LIST"],
["click","impression","event","install",
"collected cost"])
One issue I have found in above code is that if report is in queued or in running state, other events wait for this to be completed and so I thought to make this multiprocess.
I tried to add multiprocessing's pool in code but unable to do it correctly.
I request can you please add multiprocessing in my code? Since here API details are not mentioned, I will run the code and then share if multiprocessing is impacting any affect or not.
If there is any other improvement let me know that also. I will be obliged to you.
Thanks!
python-3.x multiprocessing
python-3.x multiprocessing
asked 2 days ago
HitmanHitman
72
72
put on hold as off-topic by πάντα ῥεῖ, Sᴀᴍ Onᴇᴌᴀ, 200_success, Jamal♦ 2 days ago
This question appears to be off-topic. The users who voted to close gave this specific reason:
- "Code not implemented or not working as intended: Code Review is a community where programmers peer-review your working code to address issues such as security, maintainability, performance, and scalability. We require that the code be working correctly, to the best of the author's knowledge, before proceeding with a review." – πάντα ῥεῖ, Sᴀᴍ Onᴇᴌᴀ, 200_success, Jamal
If this question can be reworded to fit the rules in the help center, please edit the question.
put on hold as off-topic by πάντα ῥεῖ, Sᴀᴍ Onᴇᴌᴀ, 200_success, Jamal♦ 2 days ago
This question appears to be off-topic. The users who voted to close gave this specific reason:
- "Code not implemented or not working as intended: Code Review is a community where programmers peer-review your working code to address issues such as security, maintainability, performance, and scalability. We require that the code be working correctly, to the best of the author's knowledge, before proceeding with a review." – πάντα ῥεῖ, Sᴀᴍ Onᴇᴌᴀ, 200_success, Jamal
If this question can be reworded to fit the rules in the help center, please edit the question.
add a comment |
add a comment |
0
active
oldest
votes
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes