Commit 2d638de0 authored by Himanshu Dabas's avatar Himanshu Dabas Committed by GitHub

fix for deprecation of v1.1 endpoints (#944)

parent 421a155a
......@@ -309,3 +309,6 @@ def run_as_command():
sys.exit(0)
main()
if __name__ == '__main__':
main()
......@@ -81,3 +81,5 @@ class Config:
TranslateDest: str = "en"
Backoff_exponent: float = 3.0
Min_wait_time: int = 0
Bearer_token: str = None
Guest_token: str = None
......@@ -4,32 +4,39 @@ from json import loads
import logging as logme
class NoMoreTweetsException(Exception):
def __init__(self, msg):
super().__init__(msg)
def Follow(response):
logme.debug(__name__+':Follow')
logme.debug(__name__ + ':Follow')
soup = BeautifulSoup(response, "html.parser")
follow = soup.find_all("td", "info fifty screenname")
cursor = soup.find_all("div", "w-button-more")
try:
cursor = findall(r'cursor=(.*?)">', str(cursor))[0]
except IndexError:
logme.critical(__name__+':Follow:IndexError')
logme.critical(__name__ + ':Follow:IndexError')
return follow, cursor
def Mobile(response):
logme.debug(__name__+':Mobile')
logme.debug(__name__ + ':Mobile')
soup = BeautifulSoup(response, "html.parser")
tweets = soup.find_all("span", "metadata")
max_id = soup.find_all("div", "w-button-more")
try:
max_id = findall(r'max_id=(.*?)">', str(max_id))[0]
except Exception as e:
logme.critical(__name__+':Mobile:' + str(e))
logme.critical(__name__ + ':Mobile:' + str(e))
return tweets, max_id
def MobileFav(response):
def MobileFav(response):
soup = BeautifulSoup(response, "html.parser")
tweets = soup.find_all("table", "tweet")
max_id = soup.find_all("div", "w-button-more")
......@@ -40,8 +47,9 @@ def MobileFav(response):
return tweets, max_id
def profile(response):
logme.debug(__name__+':profile')
logme.debug(__name__ + ':profile')
json_response = loads(response)
html = json_response["items_html"]
soup = BeautifulSoup(html, "html.parser")
......@@ -49,10 +57,54 @@ def profile(response):
return feed, feed[-1]["data-item-id"]
def Json(response):
logme.debug(__name__+':Json')
logme.debug(__name__ + ':Json')
json_response = loads(response)
html = json_response["items_html"]
soup = BeautifulSoup(html, "html.parser")
feed = soup.find_all("div", "tweet")
return feed, json_response["min_position"]
def search_v2(response):
# TODO need to implement this
response = loads(response)
if len(response['globalObjects']['tweets']) == 0:
msg = 'No more data. finished scraping!!'
raise NoMoreTweetsException(msg)
# need to modify things at the function call end
# timeline = response['timeline']['instructions'][0]['addEntries']['entries']
feed = []
feed_set = set()
# here we need to remove the quoted and `to-reply` tweets from the list as they may or may not contain the
# for _id in response['globalObjects']['tweets']:
# if 'quoted_status_id_str' in response['globalObjects']['tweets'][_id] or \
# response['globalObjects']['tweets'][_id]['in_reply_to_status_id_str']:
# try:
# feed_set.add(response['globalObjects']['tweets'][_id]['quoted_status_id_str'])
# except KeyError:
# feed_set.add(response['globalObjects']['tweets'][_id]['in_reply_to_status_id_str'])
# i = 1
# for _id in response['globalObjects']['tweets']:
# if _id not in feed_set:
# temp_obj = response['globalObjects']['tweets'][_id]
# temp_obj['user_data'] = response['globalObjects']['users'][temp_obj['user_id_str']]
# feed.append(temp_obj)
for timeline_entry in response['timeline']['instructions'][0]['addEntries']['entries']:
# this will handle the cases when the timeline entry is a tweet
if timeline_entry['entryId'].find('sq-I-t-') == 0:
_id = timeline_entry['content']['item']['content']['tweet']['id']
temp_obj = response['globalObjects']['tweets'][_id]
temp_obj['user_data'] = response['globalObjects']['users'][temp_obj['user_id_str']]
feed.append(temp_obj)
try:
next_cursor = response['timeline']['instructions'][0]['addEntries']['entries'][-1]['content'][
'operation']['cursor']['value']
except KeyError:
# this is needed because after the first request location of cursor is changed
next_cursor = response['timeline']['instructions'][-1]['replaceEntry']['entry']['content']['operation'][
'cursor']['value']
return feed, next_cursor
......@@ -37,8 +37,9 @@ def Tweet(config, t):
logme.debug(__name__+':Tweet:notFormat')
output = f"{t.id_str} {t.datestamp} {t.timestamp} {t.timezone} "
if t.retweet:
output += "RT "
# TODO: someone who is familiar with this code, needs to take a look at what this is <also see tweet.py>
# if t.retweet:
# output += "RT "
output += f"<{t.username}> {t.tweet}"
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import re
import time
import requests
import logging as logme
class TokenExpiryException(Exception):
def __init__(self, msg):
super().__init__(msg)
class Token:
def __init__(self, config):
self._session = requests.Session()
self.config = config
self._retries = 5
self._timeout = 10
self.url = 'https://twitter.com'
def _request(self):
for attempt in range(self._retries + 1):
# The request is newly prepared on each retry because of potential cookie updates.
req = self._session.prepare_request(requests.Request('GET', self.url))
logme.debug(f'Retrieving {req.url}')
try:
r = self._session.send(req, allow_redirects=True, timeout=self._timeout)
except requests.exceptions.RequestException as exc:
if attempt < self._retries:
retrying = ', retrying'
level = logme.WARNING
else:
retrying = ''
level = logme.ERROR
logme.log(level, f'Error retrieving {req.url}: {exc!r}{retrying}')
else:
success, msg = (True, None)
msg = f': {msg}' if msg else ''
if success:
logme.debug(f'{req.url} retrieved successfully{msg}')
return r
if attempt < self._retries:
# TODO : might wanna tweak this back-off timer
sleep_time = 2.0 * 2 ** attempt
logme.info(f'Waiting {sleep_time:.0f} seconds')
time.sleep(sleep_time)
else:
msg = f'{self._retries + 1} requests to {self.url} failed, giving up.'
logme.fatal(msg)
self.config.Guest_token = None
raise RefreshTokenException(msg)
def refresh(self):
logme.debug('Retrieving guest token')
res = self._request()
match = re.search(r'\("gt=(\d+);', res.text)
if match:
logme.debug('Found guest token in HTML')
self.config.Guest_token = str(match.group(1))
else:
self.config.Guest_token = None
raise RefreshTokenException('Could not find the Guest token in HTML')
This diff is collapsed.
import datetime
from sys import platform
import logging as logme
from urllib.parse import urlencode
from urllib.parse import quote
mobile = "https://mobile.twitter.com"
base = "https://twitter.com/i"
# base = "https://twitter.com/i"
base = "https://api.twitter.com/2/search/adaptive.json"
def _sanitizeQuery(base,params):
def _sanitizeQuery(_url, params):
_serialQuery = ""
for p in params:
_serialQuery += p[0]+"="+p[1]+"&"
_serialQuery = base + "?" + _serialQuery[:-1].replace(":", "%3A").replace(" ", "%20")
_serialQuery = urlencode(params, quote_via=quote)
_serialQuery = _url + "?" + _serialQuery
return _serialQuery
def _formatDate(date):
if "win" in platform:
return f'\"{date.split()[0]}\"'
......@@ -20,8 +24,9 @@ def _formatDate(date):
except ValueError:
return int(datetime.datetime.strptime(date, "%Y-%m-%d").timestamp())
async def Favorites(username, init):
logme.debug(__name__+':Favorites')
logme.debug(__name__ + ':Favorites')
url = f"{mobile}/{username}/favorites?lang=en"
if init != '-1':
......@@ -29,8 +34,9 @@ async def Favorites(username, init):
return url
async def Followers(username, init):
logme.debug(__name__+':Followers')
logme.debug(__name__ + ':Followers')
url = f"{mobile}/{username}/followers?lang=en"
if init != '-1':
......@@ -38,8 +44,9 @@ async def Followers(username, init):
return url
async def Following(username, init):
logme.debug(__name__+':Following')
logme.debug(__name__ + ':Following')
url = f"{mobile}/{username}/following?lang=en"
if init != '-1':
......@@ -47,8 +54,9 @@ async def Following(username, init):
return url
async def MobileProfile(username, init):
logme.debug(__name__+':MobileProfile')
logme.debug(__name__ + ':MobileProfile')
url = f"{mobile}/{username}?lang=en"
if init != '-1':
......@@ -56,8 +64,9 @@ async def MobileProfile(username, init):
return url
async def Profile(username, init):
logme.debug(__name__+':Profile')
logme.debug(__name__ + ':Profile')
url = f"{base}/profiles/show/{username}/timeline/tweets?include_"
url += "available_features=1&lang=en&include_entities=1"
url += "&include_new_items_bar=true"
......@@ -67,17 +76,38 @@ async def Profile(username, init):
return url
async def Search(config, init):
logme.debug(__name__+':Search')
url = f"{base}/search/timeline"
logme.debug(__name__ + ':Search')
url = base
tweet_count = 100
q = ""
params = [
('vertical', 'default'),
('src', 'unkn'),
('include_available_features', '1'),
('include_entities', '1'),
('max_position', str(init)),
('reset_error_state', 'false'),
# ('include_blocking', '1'),
# ('include_blocked_by', '1'),
# ('include_followed_by', '1'),
# ('include_want_retweets', '1'),
# ('include_mute_edge', '1'),
# ('include_can_dm', '1'),
('include_can_media_tag', '1'),
# ('skip_status', '1'),
# ('include_cards', '1'),
('include_ext_alt_text', 'true'),
('include_quote_count', 'true'),
('include_reply_count', '1'),
('tweet_mode', 'extended'),
('include_entities', 'true'),
('include_user_entities', 'true'),
('include_ext_media_availability', 'true'),
('send_error_codes', 'true'),
('simple_quoted_tweet', 'true'),
('count', tweet_count),
# ('query_source', 'typed_query'),
# ('pc', '1'),
('cursor', str(init)),
('spelling_corrections', '1'),
('ext', 'mediaStats%2ChighlightedLabel'),
('tweet_search_mode', 'live'), # this can be handled better, maybe take an argument and set it then
]
if not config.Popular_tweets:
params.append(('f', 'tweets'))
......@@ -92,7 +122,8 @@ async def Search(config, init):
config.Geo = config.Geo.replace(" ", "")
q += f" geocode:{config.Geo}"
if config.Search:
q += f" {config.Search}"
q += f"{config.Search}"
if config.Year:
q += f" until:{config.Year}-1-1"
if config.Since:
......@@ -120,6 +151,7 @@ async def Search(config, init):
q += " filter:media"
if config.Replies:
q += " filter:replies"
# although this filter can still be used, but I found it broken in my preliminary testing, needs more testing
if config.Native_retweets:
q += " filter:nativeretweets"
if config.Min_likes:
......@@ -144,3 +176,43 @@ async def Search(config, init):
params.append(("q", q))
_serialQuery = _sanitizeQuery(url, params)
return url, params, _serialQuery
# maybe dont need this
async def SearchProfile(config, init=None):
logme.debug(__name__ + ':SearchProfile')
_url = 'https://api.twitter.com/2/timeline/profile/{}.json?'
q = ""
params = [
('include_profile_interstitial_type', '1'),
('include_blocking', '1'),
('include_blocked_by', '1'),
('include_followed_by', '1'),
('include_want_retweets', '1'),
('include_mute_edge', '1'),
('include_can_dm', '1'),
('include_can_media_tag', '1'),
('skip_status', '1'),
('cards_platform', 'Web - 12'),
('include_cards', '1'),
('include_ext_alt_text', 'true'),
('include_quote_count', 'true'),
('include_reply_count', '1'),
('tweet_mode', 'extended'),
('include_entities', 'true'),
('include_user_entities', 'true'),
('include_ext_media_color', 'true'),
('include_ext_media_availability', 'true'),
('send_error_codes', 'true'),
('simple_quoted_tweet', 'true'),
('include_tweet_replies', 'false'),
('count', '50'),
('userId', '1934388686'),
('ext', 'mediaStats,ChighlightedLabel'),
]
if init:
params.append(('cursor', init))
_serialQuery = _sanitizeQuery(_url, params)
return _url, params, _serialQuery
pass
import datetime
import logging as logme
class user:
class User:
type = "user"
def __init__(self):
pass
def inf(ur, _type):
logme.debug(__name__+':inf')
try:
group = ur.find("div", "profile")
if group == None:
group = ur.find("div", "user-actions btn-group not-following")
if group == None:
group = ur.find("div", "user-actions btn-group not-following protected")
except Exception as e:
print("Error: " + str(e))
if _type == "id":
screen_name = group.find("span", "screen-name").text
ret = ur.find("a", {"data-screenname": screen_name})
ret = ret.get('data-mentioned-user-id') if ret is not None else None
ret = "" if ret is None else ret
elif _type == "name":
ret = group.find("div", "fullname").text.split('\n')[0]
elif _type == "username":
ret = group.find("span", "screen-name").text
elif _type == "private":
ret = group.find("div","protected")
if ret:
ret = 1
else:
ret = 0
return ret
def card(ur, _type):
logme.debug(__name__+':card')
if _type == "bio":
try:
ret = ur.find("div", "bio").text.replace("\n", " ").strip()
except:
ret = ""
elif _type == "location":
try:
ret = ur.find("div", "location").text
except:
ret = ""
elif _type == "url":
try:
ret = ur.find("link")["href"]
except:
ret = ""
return ret
def join(ur):
try:
logme.debug(__name__+':join')
jd = ur.find("span", "ProfileHeaderCard-joinDateText js-tooltip u-dir")["title"]
return jd.split(" - ")
except:
return ["", ""]
def convertToInt(x):
logme.debug(__name__+':contertToInt')
multDict = {
"k" : 1000,
"m" : 1000000,
"b" : 1000000000,
}
try:
if ',' in x:
x = x.replace(',', '')
y = int(x)
return y
except:
pass
try:
y = float(str(x)[:-1])
y = y * multDict[str(x)[-1:].lower()]
return int(y)
except:
pass
return 0
def stat(ur, _type):
logme.debug(__name__+':stat')
stats = ur.find('table', 'profile-stats')
stat_dict = {}
for stat in stats.find_all('td', 'stat'):
statnum, statlabel = stat.text.replace('\n', '').replace(',', '').split(' ')[:2]
stat_dict[statlabel.lower()] = int(statnum.replace(',', ''))
try :
return stat_dict[_type]
except AttributeError:
return 0
def media(ur):
logme.debug(__name__+':media')
try:
media_count = ur.find("a", "PhotoRail-headingWithCount js-nav").text.strip().split(" ")[0]
return convertToInt(media_count)
except:
return 0
def verified(ur):
logme.debug(__name__+':verified')
try:
is_verified = ur.find("img", {"alt": "Verified Account"})['alt']
if "Verified Account" in is_verified:
is_verified = 1
else:
is_verified = 0
except:
is_verified = 0
return is_verified
# ur object must be a json from the endpoint https://api.twitter.com/graphql
def User(ur):
logme.debug(__name__+':User')
u = user()
for img in ur.findAll("img", "Emoji Emoji--forText"):
img.replaceWith(img["alt"])
u.id = inf(ur, "id")
u.name = inf(ur, "name")
u.username = inf(ur, "username")
u.bio = card(ur, "bio")
u.location = card(ur, "location")
u.url = card(ur, "url")
u.join_date = join(ur)[1]
u.join_time = join(ur)[0]
u.tweets = stat(ur, "tweets")
u.following = stat(ur, "following")
u.followers = stat(ur, "followers")
u.likes = "" # stat(ur, "favorites")
u.media_count = "" # media(ur)
u.is_private = inf(ur, "private")
u.is_verified = verified(ur)
u.avatar = ur.find("img", {"alt": u.name})["src"]
#u.background_image = ur.find('div',{'class':'ProfileCanopy-headerBg'}).find('img').get('src')
return u
logme.debug(__name__ + ':User')
if 'data' not in ur and 'user' not in ur['data']:
msg = 'malformed json! cannot be parsed to get user data'
logme.fatal(msg)
raise KeyError(msg)
_usr = User()
_usr.id = ur['data']['user']['rest_id']
_usr.name = ur['data']['user']['rest_id']['legacy']['name']
_usr.username = ur['data']['user']['rest_id']['legacy']['screen_name']
_usr.bio = ur['data']['user']['rest_id']['legacy']['description']
_usr.location = ur['data']['user']['rest_id']['legacy']['location']
_usr.url = ur['data']['user']['rest_id']['legacy']['screen_name']['url']
# parsing date to user-friendly format
_dt = ur['data']['user']['rest_id']['legacy']['created_at']
_dt = datetime.datetime.strptime(_dt, '%a %b %d %H:%M:%S %z %Y')
# date is of the format year,
_usr.join_date = _dt.strftime('%d-%m-%Y')
_usr.join_time = _dt.strftime('%H:%M:%S %Z')
# :type `int`
_usr.tweets = int(ur['data']['user']['rest_id']['legacy']['statuses_count'])
_usr.following = int(ur['data']['user']['rest_id']['legacy']['friends_count'])
_usr.followers = int(ur['data']['user']['rest_id']['legacy']['followers_count'])
_usr.likes = int(ur['data']['user']['rest_id']['legacy']['favourites_count'])
_usr.media_count = int(ur['data']['user']['rest_id']['legacy']['media_count'])
_usr.is_private = ur['data']['user']['rest_id']['legacy']['protected']
_usr.is_verified = ur['data']['user']['rest_id']['legacy']['verified']
_usr.avatar = ur['data']['user']['rest_id']['legacy']['profile_image_url_https']
_usr.background_image = ur['data']['user']['rest_id']['legacy']['profile_banner_url']
# TODO : future implementation
# legacy_extended_profile is also available in some cases which can be used to get DOB of user
return _usr
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment