Commit b57b1099 authored by Cody Zacharias's avatar Cody Zacharias Committed by GitHub

1.1.3 Update

parent 6b3fcb69
from . import db, elasticsearch from . import db, elasticsearch, format, write
from .tweet import Tweet from .tweet import Tweet
from .user import User from .user import User
from bs4 import BeautifulSoup
from time import localtime, strftime
import asyncio
import csv
import datetime
import json
import os
import re
import sys
def write(entry, f):
print(entry, file=open(f, "a", encoding="utf-8"))
def writeCSV(Tweet, config):
data = {
"id": Tweet.id,
"date": Tweet.datestamp,
"time": Tweet.timestamp,
"timezone": Tweet.timezone,
"user_id": Tweet.user_id,
"username": Tweet.username,
"tweet": Tweet.tweet,
"replies": Tweet.replies,
"retweets": Tweet.retweets,
"likes": Tweet.likes,
"location": Tweet.location,
"hashtags": Tweet.hashtags,
"link": Tweet.link,
"retweet": Tweet.is_retweet,
"user_rt": Tweet.user_rt,
"mentions": Tweet.mentions,
}
if config.Custom_csv:
fieldnames = config.Custom_csv
row = {}
for f in fieldnames:
row[f] = data[f]
else:
fieldnames = [
"id",
"date",
"time",
"timezone",
"user_id",
"username",
"tweet",
"replies",
"retweets",
"likes",
"location",
"hashtags",
"link",
"retweet",
"user_rt",
"mentions",
]
row = data
if not (os.path.exists(config.Output)):
with open(config.Output, "w", newline='', encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
with open(config.Output, "a", newline='', encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writerow(row)
def writeJSON(Tweet, file):
data = {
"id": Tweet.id,
"date": Tweet.datestamp,
"time": Tweet.timestamp,
"timezone": Tweet.timezone,
"user_id": Tweet.user_id,
"username": Tweet.username,
"tweet": Tweet.tweet,
"replies": Tweet.replies,
"retweets": Tweet.retweets,
"likes": Tweet.likes,
"location": Tweet.location,
"hashtags": ",".join(Tweet.hashtags),
"link": Tweet.link,
"retweet": Tweet.is_retweet,
"user_rt": Tweet.user_rt,
"mentions": ",".join(Tweet.mentions)
}
with open(file, "a", newline='', encoding="utf-8") as json_file:
json.dump(data, json_file)
json_file.write("\n")
def getText(tweet):
text = tweet.find("p", "tweet-text").text
text = text.replace("\n", "")
text = text.replace("http", " http")
text = text.replace("pic.twitter", " pic.twitter")
return text
def getHashtags(text):
hashtag = re.findall(r'(?i)\#\w+', text, flags=re.UNICODE)
return hashtag
#return ",".join(hashtag)
def getStat(tweet, stat):
st = "ProfileTweet-action--{} u-hiddenVisually".format(stat)
return tweet.find("span", st).find("span")["data-tweet-stat-count"]
def getMentions(tweet):
try:
return tweet.find("div", "js-original-tweet")["data-mentions"].split(" ")
except:
return ""
def textMentions(tweet, mentions, text):
try:
for i in range(len(mentions)):
mention = "@{}".format(mentions[i])
if mention not in text:
text = "{} {}".format(mention, text)
except:
pass
return text
def datecheck(datestamp, config): def datecheck(datestamp, config):
if config.Since and config.Until: if config.Since and config.Until:
...@@ -134,81 +10,6 @@ def datecheck(datestamp, config): ...@@ -134,81 +10,6 @@ def datecheck(datestamp, config):
return False return False
return True return True
def retweet(config, tweet):
if config.Profile and tweet.username.lower() != config.Username:
return True
def getTweet(tw, location, config):
t = Tweet()
t.id = tw.find("div")["data-item-id"]
t.datetime = int(tw.find("span", "_timestamp")["data-time"])
t.datestamp = strftime("%Y-%m-%d", localtime(t.datetime))
t.timestamp = strftime("%H:%M:%S", localtime(t.datetime))
t.user_id = tw.find("a", "account-group js-account-group js-action-profile js-user-profile-link js-nav")["data-user-id"]
t.username = tw.find("span", "username").text.replace("@", "")
t.timezone = strftime("%Z", localtime())
for img in tw.findAll("img", "Emoji Emoji--forText"):
img.replaceWith("<{}>".format(img['aria-label']))
t.mentions = getMentions(tw)
t.tweet = textMentions(tw, t.mentions, getText(tw))
t.location = location
t.hashtags = getHashtags(t.tweet)
t.replies = getStat(tw, "reply")
t.retweets = getStat(tw, "retweet")
t.likes = getStat(tw, "favorite")
t.link = "https://twitter.com/{0.username}/status/{0.id}".format(t)
if retweet(config, t):
t.is_retweet = True
t.user_rt = config.Username
else:
t.user_rt = "None"
return t
async def getUser(user):
u = User()
u.name = user.find("a")["name"]
return u
def getOutput(Tweet, config, conn):
if config.Format:
output = config.Format.replace("{id}", Tweet.id)
output = output.replace("{date}", Tweet.datestamp)
output = output.replace("{time}", Tweet.timestamp)
output = output.replace("{user_id}", Tweet.user_id)
output = output.replace("{username}", Tweet.username)
output = output.replace("{timezone}", Tweet.timezone)
output = output.replace("{tweet}", Tweet.tweet)
output = output.replace("{location}", Tweet.location)
output = output.replace("{hashtags}", str(Tweet.hashtags))
output = output.replace("{replies}", Tweet.replies)
output = output.replace("{retweets}", Tweet.retweets)
output = output.replace("{likes}", Tweet.likes)
output = output.replace("{link}", Tweet.link)
output = output.replace("{is_retweet}", str(Tweet.is_retweet))
output = output.replace("{user_rt}", Tweet.user_rt)
output = output.replace("{mentions}", str(Tweet.mentions))
else:
output = "{} {} {} {} ".format(Tweet.id, Tweet.datestamp,
Tweet.timestamp, Tweet.timezone)
if retweet(config, Tweet):
output += "RT "
output += "<{}> {}".format(Tweet.username, Tweet.tweet)
if config.Show_hashtags:
output += " {}".format(",".join(Tweet.hashtags))
if config.Stats:
output += " | {} replies {} retweets {} likes".format(Tweet.replies,
Tweet.retweets, Tweet.likes)
if config.Location:
output += " | Location {}".format(Tweet.location)
return output
def is_tweet(tw): def is_tweet(tw):
try: try:
tw.find("div")["data-item-id"] tw.find("div")["data-item-id"]
...@@ -216,27 +17,53 @@ def is_tweet(tw): ...@@ -216,27 +17,53 @@ def is_tweet(tw):
except: except:
return False return False
def _output(obj, output, config):
if config.Output != None:
if config.Store_csv:
write.Csv(obj, config)
elif config.Store_json:
write.Json(obj, config)
else:
write.Text(output, config.Output)
if config.Elasticsearch:
print(output, end=".", flush=True)
else:
print(output)
async def Tweets(tw, location, config, conn): async def Tweets(tw, location, config, conn):
copyright = tw.find("div", "StreamItemContent--withheld") copyright = tw.find("div", "StreamItemContent--withheld")
if copyright is None and is_tweet(tw): if copyright is None and is_tweet(tw):
Tweet = getTweet(tw, location, config) tweet = Tweet(tw, location, config)
if datecheck(Tweet.datestamp, config): if datecheck(tweet.datestamp, config):
output = getOutput(Tweet, config, conn) output = format.Tweet(config, tweet)
if config.Database: if config.Database:
db.tweets(conn, Tweet) db.tweets(conn, tweet)
if config.Elasticsearch: if config.Elasticsearch:
elasticsearch.Tweet(Tweet, config.Elasticsearch, config.Essid) elasticsearch.Tweet(tweet, config.Elasticsearch, config.Essid)
_output(tweet, output, config)
if config.Output != None: async def Users(u, config, conn):
if config.Store_csv: user = User(u)
writeCSV(Tweet, config) output = format.User(config.Format, user)
elif config.Store_json:
writeJSON(Tweet, config.Output)
else:
write(output, config.Output)
if config.Elasticsearch: if config.Database:
print(output, end=".", flush=True) db.user(conn, config.Username, config.Followers, user)
else:
print(output) #if config.Elasticsearch:
# elasticsearch.Follow(config.Elasticsearch, user,
# config.Username, config.Essid)
_output(user, output, config)
async def Username(username, config, conn):
if config.Database:
db.follow(conn, config.Username, config.Followers, username)
if config.Elasticsearch:
elasticsearch.Follow(config.Elasticsearch, username,
config.Username, config.Essid)
_output(username, username, config)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment