PKVJ{_)reddit_persona/__init__.py""" This module uses APIs from Reddit and Indico.io to provide insights into a Redditor's personality attributes. Warning: This is just for fun, do not take results seriously. It is an experiment in combining social media data with machine learning APIs""" __version__ = "1.2.1" from reddit_persona import reddit_get, io_helper, insights, keycheck from reddit_persona.go import go from reddit_persona.keycheck import new_key, test_key, get_key __all__ = ['reddit_get', 'io_helper', 'insights', 'go', 'keycheck'] mpdPath = __file__ PKKJs;;reddit_persona/__main__.pyimport reddit_persona as rp import sys rp.go(sys.argv[1]) PKKJ78reddit_persona/go.pyfrom reddit_persona import insights from reddit_persona import reddit_get from reddit_persona import io_helper def show(USERNAME, target): fpath = io_helper.out_path(USERNAME, target) with open(fpath, 'r') as f: response = ' '.join([line + ' ' for line in f]) print(str(response)) def go(target, refresh=60 * 60 * 24): parse = target.split('/') USERNAME = parse[2] if not io_helper.check_time(USERNAME, target, refresh): show(USERNAME, target) return if parse[1] == 'u': target = 'user' reddit_get.user_text(user=USERNAME, refresh=refresh) elif parse[1] == 'r': target = 'sub' reddit_get.user_text(sub=USERNAME, refresh=refresh) try: indicoio.config.api_key = indicoKey.key indicoio.sentiment("I love writing code!") except Exception as e: io_helper.read_raw(USERNAME, target) insights.execute(USERNAME, target, refresh) show(USERNAME, target) # TODO Add subreddits or descriptions PKKJzu reddit_persona/indicoKey.txt42fd1c521599079dab79ef889bc9c676PKJWi7BBreddit_persona/insights.pyimport indicoio import os.path as path import os import sys from reddit_persona import io_helper from reddit_persona import keycheck meta_dict = {} def execute(USERNAME, target, refresh): r_data = io_helper.read_raw(USERNAME, target) og = sys.stdout fpath = io_helper.out_path(USERNAME, target) def analysis(raw='', limit=5, text='', percent=True): global meta_dict # print lines if input is a list of non-dicts # if input is list of dicts, merge dicts and resend to analysis if isinstance(raw, list): for item in raw: if not isinstance(item, dict): print(item) else: create_meta_dict(item) analysis(meta_dict, limit, text, percent) # if input is dict: print k, v pairs # optional args for return limit and description text if isinstance(raw, dict): print(text) ct = 0 for v in sorted(raw, key=raw.get, reverse=True): ct += 1 if ct > limit: break if isinstance(raw[v], float): if percent: per = r'%' else: per = '' print(" " + v, str(round(raw[v] * 100, 2)) + per) else: print(v, raw[v]) print() def create_meta_dict(item): # merge list of dicts into master dict global meta_dict meta_dict[item['text']] = item['confidence'] return meta_dict rClean = '' for i in range(len(r_data)): if r_data[i - 1] == '\\': rClean = rClean[:-1] if r_data[i] != "'": continue if r_data[i] == '*': rClean += ' ' else: rClean += r_data[i] r_data = rClean del rClean indicoio.config.api_key = keycheck.get_key() # Big 5 big5 = {'text': "Big 5 personality inventory matches: ", "payload": indicoio.personality(r_data)} # Meyers briggs mbtiLabels = indicoio.personas(r_data) mbti_dict = { 'architect': 'intj', 'logician': 'intp', 'commander': 'entj', 'debater': 'entp', 'advocate': 'infj', 'mediator': 'infp', 'protagonist': 'enfj', 'campaigner': 'enfp', 'logistician': 'istj', 'defender': 'isfj', 'executive': 'estj', 'consul': 'esfj', 'virtuoso': 'istp', 'adventurer': 'isfp', 'entrepreneur': 'estp', 'entertainer': 'esfp' } def replace_mbti(): for k, v in mbtiLabels.items(): k = k.replace(k, mbti_dict[k]) yield k k = (list(replace_mbti())) v = map(lambda x: x, mbtiLabels.values()) payload = (dict(zip(k, v))) mbti = {'text': "Most likely personalilty styles: ", "payload": payload, 'ct': 5, 'percent': True} # Political pol = {'text': "Political alignments: ", "payload": indicoio.political(r_data, version=1)} # Sentiment sen = {'text': "Sentiment: ", "payload": {'Percent positive': indicoio.sentiment(r_data)}, 'ct': 3} # Emotion emo = {'text': "Predominant emotions:", "payload": indicoio.emotion(r_data), 'ct': 5} # Keywords kw = {'text': "Keywords: ", "payload": indicoio.keywords(r_data), 'ct': 5} # Text tags tt = {'text': "Text tags: ", "payload": indicoio.text_tags(r_data), 'ct': 10} # Place pla = { 'text': "Key locations: ", 'payload': indicoio.places(r_data, version=2), 'ct': 3, 'percent': True } def Karma(USERNAME): import praw import collections kList = [] user_agent = ("N2ITN") r = praw.Reddit(user_agent=user_agent) thing_limit = 100 user = r.get_redditor(USERNAME) gen = user.get_submitted(limit=thing_limit) karma_by_subreddit = {} for thing in gen: subreddit = thing.subreddit.display_name karma_by_subreddit[subreddit] = (karma_by_subreddit.get(subreddit, 0) + thing.score) for w in sorted(karma_by_subreddit, key=karma_by_subreddit.get, reverse=True): kList.append(str(w) + ': ' + str(karma_by_subreddit[w])) kList.insert(0, 'Karma by Sub') print("\n\t".join(kList[:10])) def show(results): # Accepts bag of dicts, or single dict if not isinstance(results, dict): for X in results: show(X) else: if results == pla and pla['payload'] == []: print("Not enough information to infer place of origin") print() else: i = results analysis( raw=i.get('payload', ''), limit=i.get('ct', 5), text=i.get('text', ''), percent=i.get('percent', True) ) with open(fpath, 'w') as outtie: sys.stdout = outtie print(target + USERNAME) print() show([kw, pla, big5, emo, sen, pol, mbti, tt]) # Karma(USERNAME) sys.stdout = og return PKKJlreddit_persona/io_helper.pyfrom os import path, getcwd import datetime p, f = path.split(path.abspath(__file__)) usr_path = path.join(p, 'cache') def read_raw(USERNAME, target): f = USERNAME + '_raw.txt' memoize = path.join(usr_path, target, f) with open(memoize, 'r') as r_data: r_data = ''.join([line for line in r_data]) return r_data def out_path(USERNAME, target): f = USERNAME + '.txt' memoize = path.join(usr_path, target, f) return memoize def check_time(USERNAME, target, seconds_): # Prevent new API requests if result exists from within one minute memoize = out_path(USERNAME, target) try: ttime = path.getmtime(memoize) except Exception as e: return True mtime = datetime.datetime.fromtimestamp(ttime) ntime = datetime.datetime.now() dlta = ntime - mtime if abs(dlta.total_seconds()) > seconds_: return True else: return False PKKJ_t`lreddit_persona/keycheck.pyfrom os import path, getcwd f = 'indicoKey.txt' base_dir = path.join(getcwd(), path.dirname(__file__)) keyPath = path.join(base_dir, f) def get_key(): with open(keyPath, 'r') as c: return c.read() def test_key(): with open(keyPath, 'r') as c: keycheck = c.read() try: import indicoio indicoio.config.api_key = keycheck indicoio.sentiment("I love writing code!") return True except Exception as e: print("Indico API key missing/invalid") print() print( 'Redditor text can be collected with reddit_persona.go(USERNAME), but it will not be analyzed' ) print() print('To enter your indico API key, use reddit_persona.new_key( )') print() return False test_key() def new_key(k=False): if k: with open(keyPath, 'w') as w: w.write(str(k)) with open(keyPath, 'r') as t: test_read = t.read() if test_key() == True: print( "Key validated and saved to disk. You will not need to re-enter again" ) return PKJ reddit_persona/reddit_creds.json{ "client_id": "6ci4LeHCk4TV6Q", "client_secret": "1MVQvXr_pNKXjDLL3QYLix41Yhc", "password": "F8PQCgJ5xJ5a", "user_agent": "testscript by /u/persona_bot", "username": "persona_bot" } PKJp<\DDreddit_persona/reddit_get.pyfrom os.path import join, dirname from os import path, getcwd from pprint import pprint import collections import praw from reddit_persona import io_helper import json class reddit: def __init__(self, name, target): self.name = name self.target = target base_dir = path.join(getcwd(), path.dirname(__file__)) creds = json.load(open(base_dir + '/' + 'reddit_creds.json')) self.user_agent = praw.Reddit(**creds) def get_user(self): redditor = praw.models.Redditor(self.user_agent, self.name) def comments(): return [c.body.split('\n', 1)[0] for i, c in enumerate(redditor.comments.top(limit=100))] def submissions(): return [ '\n\t'.join([c.title, c.selftext]) for i, c in enumerate(redditor.submissions.top(limit=100)) ] return '\n\t'.join(comments() + submissions()) ''' TODO Update subreddit analyis for new API Outdated version below (should be quick fix, new api is streamlined) ''' # def get_sub(self): # sub_data = self.user_agent.get_subreddit(self.name) # top_posts = sub_data.get_top_from_year() # def sub_comments(): # for post in top_posts: # yield post # for comment in post.comments: # if type(comment) != praw.objects.MoreComments: # yield comment # y = list(sub_comments()) # self.reddit_data = ' '.join([str(d) for d in y]).encode('utf-8') def report(username, target): payload = reddit(username, target).get_user() assert len(payload) > 1, "Error - Data not retrieved" return str(payload) def user_text(refresh=False, user=False, sub=False): if user: accountname = user target = 'user' elif sub: accountname = sub target = 'sub' u = accountname + '_raw.txt' memoize = path.join(io_helper.usr_path, target, u) with open(memoize, 'w') as r_data: nameOut = report(accountname, target) r_data.write(nameOut) PKJRtreddit_persona/requirements.txtIndicoIo==1.0.9 praw==5.0.1 PKʳJ7%||reddit_persona/setup.pyfrom distutils.core import setup setup( name='reddit_persona', version='1.1.0', py_modules=['reddit_persona'],) PKKJH))UUreddit_persona/test.pyimport reddit_persona as rp testme = rp.go('GovSchwarzenegger') print(testme) PKJ&reddit_persona/cache/user/.placeholderPK!HIWd$reddit_persona-1.2.1.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,Q0343 /, (-JLR()*M ILR(4KM̫#DPK!H 'reddit_persona-1.2.1.dist-info/METADATA]?O0 | oLMu7f6N@AX5JgwO7ʅ%9ٽ0L޳~.-osKQ| jcsOcuSr.& (0S7P ҸZ 7ĉa2́vXK|5z ~MmlnNjň6#ꅪk8U 2h@C0KIP;ElQ\DrMWqj9qĩd1qhl=iWJ;uf5*ʗN$<3gYN1$~|^̂W VLƮۆ:g0-ẲO9qn-ZH(& ^DkϿ\eQȼɎj{Ew9# n I.l| aZ&Qka\{N5Wr|[Ik{cwїqޚՓ(y.P ;j%=-HFHk` PIdqe=o;&7mZş`tXeJ0WvuoɄ\ɱ=\ըQQ\ބX{O~tނgեM%S]ԩF(>qi.ɉV0y`k@@y#PKNQm g62lp fk麦zfZ6-˸:ij~跁bhJid34St׶TWTDW]+jGqC=1Ć