Commit c9cb6355 authored by GiraldNet's avatar GiraldNet
Browse files

cloned project from github, no need to bring the commits

parent d7536009
Pipeline #1926 canceled with stages
*pyc
*zip
*gz
\ No newline at end of file
unzip $1 -d .
psql -d $3 -1 -f empty_tables.sql
psql -d $3 -1 -f load.sql
rm *.csv *html *.pdf load.sql setup.sql
mv $1 $2
# It will wait until 3 am, just to be sure
# issue the requests,
# success: memorize the id of the 2 requests
# Failure: log the wrong behaviour
# a missing request - try again every 10 minutes maximum until 10 times, then stop
# Check periodically untill those requests are all successful
# download the files,
# success: memorize the names of the files and update the table with the latest interval
# Failure: log the wrong behaviour and stop and delete all the downloaded documents
# Update database
from datetime import date, datetime, timedelta
import time
import sys
import tools
#import logging
#logger = logging.getLogger()
#logger.disabled = True
if len(sys.argv) != 2:
print 'Only one parameter required: the configuration JSON file'
sys.exit()
CONFIG_JSON = tools.load_config_json(sys.argv[1])
print CONFIG_JSON
TABLES_REQUEST, CLICKSTREAM_REQUEST, RANGE_LAST = tools.issue_requests(CONFIG_JSON)
TABLES_FILE, CLICKSTREAM_FILES, CONFIG_JSON = tools.download_requests(TABLES_REQUEST, CLICKSTREAM_REQUEST, CONFIG_JSON)
CONFIG_JSON = tools.update_database_tables(TABLES_FILE, CONFIG_JSON)
CONFIG_JSON = tools.update_database_clickstream(CLICKSTREAM_FILES, CONFIG_JSON)
tools.update_jsonfile(RANGE_LAST,sys.argv[1],CONFIG_JSON)
# to run the script that refreshes the materialized views
# sudo -u www-data /srv/apps/coursera/venv/bin/python /srv/apps/coursera/code/manage.py refreshviews
#gunzip *.gz
psql -1 -f setup_events.sql
#python load_events.py *.csv
#gzip *.csv
#mv *.csv $1
echo 'Hola'
unzip $1 -d .
psql -d $3 -1 -f setup_tables_unique.sql
psql -d $3 -1 -f load.sql
rm *.csv *html *.pdf load.sql setup.sql
mv $1 $2
{"first_time": "no", "initial_download_location": ".", "database_name": "sep-user", "table_export_location": "tables/", "last_clickstream_date": "2018-10-24", "fist_clickstream_date": "2018-07-01", "eit_digital_id": "253", "clickstream_purpose": "of the clickstream data.", "clickstream_export_location": "events/", "purpose_for_requests": "For self study and analysis ", "tables_purpose": "of the tables data.", "database_user": "sep-user"}
\ No newline at end of file
{"first_time": "no", "initial_download_location": ".", "database_name": "coursera", "table_export_location": "tables/", "last_clickstream_date": "2018-10-24", "fist_clickstream_date": "2018-07-01", "eit_digital_id": "253", "clickstream_purpose": "of the clickstream data.", "clickstream_export_location": "events/", "purpose_for_requests": "For self study and analysis ", "tables_purpose": "of the tables data.", "database_user": "www-data"}
\ No newline at end of file
This diff is collapsed.
import sys
import psycopg2
def load_events(list_of_events):
conn = psycopg2.connect("dbname=sep-user user=sep-user")
cur = conn.cursor()
for event_file in list_of_events:
print event_file
cur.copy_expert("COPY clickstream_events FROM STDIN WITH CSV DELIMITER ',' QUOTE '\"' ESCAPE '\\'",open(event_file,'r'))
conn.commit()
cur.close()
conn.close()
#gunzip *.gz
psql -d $1 -1 -f setup_events.sql
#python load_events.py *.csv
#gzip *.csv
#mv *.csv $1
DROP TABLE IF EXISTS clickstream_events;
CREATE TABLE clickstream_events (
hashed_user_id varchar,
hashed_session_cookie_id varchar,
server_timestamp timestamp,
hashed_ip varchar,
user_agent varchar,
url varchar,
initial_referrer_url varchar,
browser_language varchar,
course_id varchar,
country_cd varchar,
region_cd varchar,
timezone varchar,
os varchar,
browser varchar,
key varchar,
value varchar
);
This diff is collapsed.
from datetime import date, datetime, timedelta
from courseraresearchexports.exports import api, utils
from courseraresearchexports.models import ExportRequest
from courseraresearchexports.constants.api_constants import ANONYMITY_LEVEL_COORDINATOR, EXPORT_TYPE_TABLES,EXPORT_TYPE_CLICKSTREAM, SCHEMA_NAMES
range_first = str(date.today() - timedelta(days=5))
range_last = str(date.today() - timedelta(days=1))
CLICKSTREAM_ISSUE_REQUEST = ExportRequest.ExportRequest(partner_id = 253, statement_of_purpose = "purpose_for_requests", export_type = EXPORT_TYPE_CLICKSTREAM, anonymity_level = ANONYMITY_LEVEL_COORDINATOR, interval= [range_first,range_last])
clickstream_response = api.post(CLICKSTREAM_ISSUE_REQUEST)
print 'THIS IS THE RESPONSE OF THE POST' + str(clickstream_response[0])
registered_clickstream_request = api.get(clickstream_response[0].to_json()["id"])
print 'THIS IS THE REGISTERED REQUEST ' + str(registered_clickstream_request[0])
from courseraresearchexports.exports import api
from datetime import date, datetime
def show_all_requests():
"""
This method checks for requests issued in the same day and returns the list of requests if all are successful.
It may return an empty list if not all the requests of today are ready for download.
Returned requests are request objects.
"""
all_requests = api.get_all()
for single_request in all_requests:
print single_request.to_json()
def get_today_requests():
all_requests = api.get_all()
today_requests = []
today_is = date.today()
for single_request in all_requests:
created_at = date.fromtimestamp(single_request.to_json()['metadata']['createdAt'] / 1e3)
#print 'THE REQUEST WAS CREATED AT: ' + str(created_at) + ' AND TODAY IS: ' + str(today_is)
if today_is == created_at:
print_list = []
print_list += [single_request.to_json()['exportType']]
print_list += [single_request.to_json()['status']]
if single_request.to_json()['exportType'] == 'RESEARCH_EVENTING':
print_list += [str(single_request.to_json()['interval'])]
print_list += [str(datetime.fromtimestamp(single_request.to_json()['metadata']['createdAt'] / 1e3))]
if single_request.to_json()['status'] == 'SUCCESSFUL' :
print_list += [str(datetime.fromtimestamp(single_request.to_json()['metadata']['completedAt'] / 1e3))]
print_list += [single_request.to_json()['id']]
print print_list
#show_all_requests()
get_today_requests()
\ No newline at end of file
import json
import sys
import time
import os
import psycopg2
from datetime import date, datetime, timedelta
from courseraresearchexports.models import ExportRequest
from courseraresearchexports.exports import api, utils
from courseraresearchexports.constants.api_constants import ANONYMITY_LEVEL_COORDINATOR, EXPORT_TYPE_TABLES,EXPORT_TYPE_CLICKSTREAM, SCHEMA_NAMES
def load_config_json(file_name):
try:
config_json_file = open(file_name, 'r')
try:
config_json = json.load(config_json_file)
except:
print 'ERROR LOADING CONFIGURATION JSON'
sys.exit()
config_json_file.close()
return config_json
except:
print 'ERROR OPENING FILE'
sys.exit()
def get_table_request(today_requests, tab_req):
if today_requests==[]:
return None
else:
for r in today_requests:
#print 'THIS IS THE EXISTING TABLE REQUEST: ' +str(r.to_json())
#print 'THIS IS THE DESIRED TABLE REQUEST: ' +str(tab_req.to_json())
if r.to_json()['exportType'] == tab_req.to_json()['exportType'] and r.to_json()['anonymityLevel'] == tab_req.to_json()['anonymityLevel'] and sorted(r.to_json()['schemaNames']) == sorted(tab_req.to_json()['schemaNames']) and sorted(r.to_json()['scope'].items()) == sorted(tab_req.to_json()['scope'].items()):
print 'WE FOUND A MATCH!'
return r
print 'No similar table request'
return None
def get_clickstream_request(today_requests, click_req):
print "ENTERING METHOD: get_clickstream_request"
if today_requests==[]:
return None
else:
for r in today_requests:
if r.to_json()['exportType'] == click_req.to_json()['exportType'] and r.to_json()['anonymityLevel'] == click_req.to_json()['anonymityLevel'] and sorted(r.to_json()['scope'].items()) == sorted(click_req.to_json()['scope'].items()) and sorted(r.to_json()['interval'].items()) == sorted(click_req.to_json()['interval'].items()):
print 'WE FOUND A MATCH!'
return r
print 'No similar clickstream request'
return None
def get_today_requests():
print "ENTERING METHOD: get_today_requests"
all_requests = api.get_all()
print all_requests
today_requests = []
today_is = date.today()
for single_request in all_requests:
created_at = date.fromtimestamp(single_request.to_json()['metadata']['createdAt'] / 1e3)
#print 'THE REQUEST WAS CREATED AT: ' + str(created_at) + ' AND TODAY IS: ' + str(today_is)
if today_is == created_at:
today_requests += [single_request]
return today_requests
def get_from_todays(tab_req, click_req):
print "ENTERING METHOD: get_from_todays"
today_requests = get_today_requests()
#print today_requests
return_tab_req = get_table_request(today_requests, tab_req)
return_clickstream_req = get_clickstream_request(today_requests, click_req)
return return_tab_req, return_clickstream_req
'''
def get_latest_table_request():
print "ENTERING METHOD: get_latest_table_request"
today_requests = get_today_requests()
latest = None
for r in today_requests:
if r.to_json()['exportType'] == 'RESEARCH_WITH_SCHEMAS':
latest = r
break
return latest
'''
def issue_requests(config_json):
range_first = config_json["fist_clickstream_date"] if config_json["first_time"]=="yes" else config_json["last_clickstream_date"]
range_last = str(date.today() - timedelta(days=1)) if datetime.today().hour > 3 else str(date.today() - timedelta(days=2))
print (range_first, range_last)
TABLES_ISSUE_REQUEST = ExportRequest.ExportRequest(partner_id = int(config_json["eit_digital_id"]), statement_of_purpose = config_json["purpose_for_requests"] + config_json["tables_purpose"], export_type = EXPORT_TYPE_TABLES, anonymity_level = ANONYMITY_LEVEL_COORDINATOR, schema_names = SCHEMA_NAMES)
CLICKSTREAM_ISSUE_REQUEST = ExportRequest.ExportRequest(partner_id = int(config_json["eit_digital_id"]), statement_of_purpose = config_json["purpose_for_requests"] + config_json["clickstream_purpose"], export_type = EXPORT_TYPE_CLICKSTREAM, anonymity_level = ANONYMITY_LEVEL_COORDINATOR, interval= [range_first,range_last])
registered_tables_request = None
registered_clickstream_request = None
exit_with_error = False
error_message = 'Error requesting the '
registered_tables_request, registered_clickstream_request = get_from_todays(TABLES_ISSUE_REQUEST,CLICKSTREAM_ISSUE_REQUEST)
print registered_tables_request, registered_clickstream_request
while(registered_tables_request==None or registered_clickstream_request==None):
try:
# try to get the table request
# table_request_id
if registered_tables_request==None:
print 'TIME TO REQUEST TABLES'
tables_response = api.post(TABLES_ISSUE_REQUEST)
print 'THIS IS THE RESPONSE OF THE TABLE-POST' + str(tables_response[0])
registered_tables_request = api.get(tables_response[0].to_json()["id"])[0]
print 'THIS IS THE REGISTERED TABLE-REQUEST ' + str(registered_tables_request)
except Exception as e:
print 'THIS IS THE MESSAGE: '
print e.message
print 'THAT WAS THE MESSAGE'
if int(str(e.message.split(' ')[0])) != 429 :
#time to wait
#print e.message
#print 'TIME TO WAIT for the latest request! ' + str(datetime.fromtimestamp(get_latest_table_request().to_json()['metadata']['startedAt']/ 1e3))
#print 'MIGHT NEED TO WAIT UNTIL: ' + str(datetime.fromtimestamp(get_latest_table_request().to_json()['metadata']['startedAt']/ 1e3) + timedelta(hours=1))
#print 'ONLY ' + str((datetime.fromtimestamp(get_latest_table_request().to_json()['metadata']['startedAt']/ 1e3) + timedelta(hours=1) - datetime.today()).seconds)
exit_with_error = True
error_message += 'tables: ' + e.message
break
else:
print 'YOU NEED TO WAIT FOR TABLES'
try:
# try to get the clickstream request
if registered_clickstream_request==None:
print 'ABOUT TO REQUEST CLICKSTREAM'
print str(CLICKSTREAM_ISSUE_REQUEST.to_json())
clickstream_response = api.post(CLICKSTREAM_ISSUE_REQUEST)
print 'THIS IS THE RESPONSE OF THE CLICKSTREAM-POST: ' + str(clickstream_response)
print 'THIS IS THE RESPONSE OF THE CLICKSTREAM-POST: ' + str(clickstream_response[0])
print type(clickstream_response[0])
registered_clickstream_request = api.get(clickstream_response[0].to_json()["id"])[0]
print 'THIS IS THE REGISTERED CLICKSTREAM-REQUEST ' + str(registered_clickstream_request)
except Exception as e:
print 'THIS IS THE MESSAGE: '
print e.message
print 'THAT WAS THE MESSAGE: '
if int(e.message.split(' ')[0]) != 429:
exit_with_error = True
error_message += 'clickstream: ' + e.message
break
else:
print 'YOU NEED TO WAIT FOR EVENTS'
#wait for the necessary time
print registered_tables_request,registered_clickstream_request
if (registered_tables_request==None or registered_clickstream_request==None):
time.sleep(60)
if exit_with_error :
print error_message
sys.exit()
else:
return registered_tables_request, registered_clickstream_request, range_last
def download_requests(tables_request, clickstream_request, config_json):
print 'ENTERING METHOD DOWNLOAD REQUEST'
downloaded_tables = False
downloaded_clickstream = False
exit_with_error = False
error_message = 'Error downloading the '
table_file = ''
clickstream_files = []
tables_registered_req = None
clickstream_registered_req = None
while (not downloaded_tables or not downloaded_clickstream):
if not downloaded_tables:
try:
print type(tables_request)
print type(tables_request.to_json())
tables_registered_req = api.get(tables_request.to_json()['id'])
except Exception as e:
print e.message
exit_with_error = True
error_message += 'tables, at updating request'
break
if tables_registered_req[0].to_json()['status'] == 'SUCCESSFUL':
try:
# try to download the table request
table_file = utils.download(tables_registered_req[0],config_json['initial_download_location'])
downloaded_tables = True
except Exception as e:
exit_with_error = True
error_message += 'tables'
break
else:
print 'MUST WAIT FOR THE TABLES'
if not downloaded_clickstream:
try:
clickstream_registered_req = api.get(clickstream_request.to_json()['id'])
except:
exit_with_error = True
error_message += 'tables, at updating request'
break
print str(clickstream_registered_req)
if clickstream_registered_req[0].to_json()['status'] == 'SUCCESSFUL':
try:
# try to download the clickstream request
clickstream_files = utils.download(clickstream_registered_req[0],config_json['initial_download_location'])
downloaded_clickstream = True
except Exception as e:
exit_with_error = True
error_message += 'clickstream, at downloading'
break
else:
print 'MUST WAIT FOR THE CLICKSTREAM'
#wait for the necessary time
if (not downloaded_tables or not downloaded_clickstream):
time.sleep(60)
if exit_with_error :
print error_message
sys.exit()
else:
return table_file, clickstream_files, config_json
def update_database_tables(tables_file, config_json):
print os.getcwd()
try:
if config_json['first_time'] == 'yes':
command = './automatic_setup_and_load_tables.sh '+ tables_file[0].split('/')[-1] + ' ' + config_json['table_export_location'] + ' ' + config_json['database_name']
print command
else:
command = './automatic_empty_and_load_tables.sh '+ tables_file[0].split('/')[-1] + ' ' + config_json['table_export_location'] + ' ' + config_json['database_name']
print command
os.system(command)
# try to run the setup_and_load.sh
except Exception as e:
print "error at setting up and loading the database TABLES"
sys.exit()
# config_json is updated?
return config_json
def load_events(list_of_event_files, db_name, db_user_name):
conn = psycopg2.connect("dbname="+db_name+" user="+db_user_name)
cur = conn.cursor()
for event_file in list_of_event_files:
print event_file[:-3]
cur.copy_expert("COPY clickstream_events FROM STDIN WITH CSV DELIMITER ',' QUOTE '\"' ESCAPE '\\'",open(event_file[:-3],'r'))
conn.commit()
cur.close()
conn.close()
def update_database_clickstream(clickstream_files, config_json):
try:
# try to run the setup_and_load.sh
if config_json['first_time']=='yes':
print 'ITS THE FIRST TIME!'
command = './setup_events.sh '+config_json['database_name']
print command
os.system(command)
config_json['first_time']='no'
if config_json['first_time']=='no':
os.system('gunzip *.gz')
print clickstream_files
load_events(clickstream_files, config_json["database_name"],config_json["database_user"])
os.system('gzip *.csv')
os.system('mv *.gz '+ config_json['clickstream_export_location'])
except Exception as e:
print "error at setting up and loading the database CLICKSTREAM"
sys.exit()
# config_json is updated?
return config_json
def update_jsonfile(range_last, file_name, config_json):
print file_name
config_json['last_clickstream_date'] = range_last
aux = open(file_name,'w+')
json.dump(config_json, aux)
#aux.write(str(config_json))
aux.close()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment