Commit 4112ddd8 authored by tsukumi's avatar tsukumi

Merge branch 'dev' of github.com:shadowban-eu/testing

parents 9a5aa369 21f88a63
DEBUG=1
ACCOUNT_FILE=./.htaccounts
COOKIE_DIR=./.htcookies
LOG_FILE=./logs/results.log
DEBUG_FILE=./logs/debug.log
PORT=4040
HOST=127.0.0.1
MONGO_HOST=127.0.0.1
HOST=localhost
TWITTER_AUTH_KEY=AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA
CORS_ALLOW=http://localhost:3000
GUEST_SESSIONS=3
MONGO_HOST=localhost
MONGO_PORT=27017
MONGO_DB=tester
TWITTER_AUTH_KEY=GRAPHQL_KEY
MONGO_DB=shadowban-testing
MONGO_USERNAME=root
MONGO_PASSWORD=5X-rl[(EMdJKll1|qMDU}5xY<t?F.UEo
......@@ -56,7 +56,6 @@ node_modules
!.env.example
.ht*
# starter scripts with possibly sensitive data
dev.sh
start.sh
restart.sh
# python misc
.venv
mprofile_*.dat
FROM python:3.5.7-slim-buster
FROM python:3.7-slim-buster
RUN apt update
RUN apt install gcc python3-dev -y
RUN mkdir /app
WORKDIR /app
ADD requirements.txt /app/
ADD . /app
RUN pip3 install --no-cache-dir -r ./requirements.txt
ENV NO_VENV=1
WORKDIR /app
RUN ./bin/install.sh
Pull
```
git clone https://github.com/shadowban-eu/shadowban-eu-backend && cd shadowban-eu-backend
```
### Docker
Add config
```
cp ~/configs/.env.development|production ./
cp ~/configs/.htaccounts
Follow the quickstart instructions in our docker repo's [README](https://github.com/shadowban-eu/docker#shadowbandev).
### Install/Run
```bash
$ git clone https://github.com/shadowban-eu/testing ./testing; cd $_
$ ./bin/install.sh
$ ./bin/docker-entry.sh .env.example # takes any .env file
```
Run
`./run.sh development|production`
### Memory Profiling
Using [memory-profiler](https://pypi.org/project/memory-profiler/) to debug memory usage:
See [.env.example](https://github.com/shadowban-eu/shadowban-eu-backend/blob/dev/.env.example) for .env variables
```bash
# When env vars are already set
$ ./bin/docker-entry.sh mprof
# Otherwise, read from file
$ ./bin/docker-entry.sh .env.example mprof
# Passes remaining arguments to mprof (set interval to 5s)
$ ./bin/docker-entry.sh .env.example mprof -T 5
```
This diff is collapsed.
echo -n "Looking for python3: "
if ! hash python3 &> /dev/null; then
echo -e "\nFatal: Please install Python3 to use this program!"
exit 1
fi
echo "OK"
if [ "$NO_VENV" != "1" ]; then
if [ ! -f .venv/bin/activate ]; then
echo "Creating new venv in: ./.venv"
python3 -m venv ./.venv
fi
echo "Loading virtualenv: ./.venv"
source ./.venv/bin/activate
else
echo "Skipping venv setup"
fi
#!/usr/bin/env bash
source ./bin/_venv
if [ "$1" != "" ] && [ -f $1 ]; then
echo "Using provided .env file: $1"
export $(cat $1 | xargs)
shift
fi
CMD="python3 -u ./backend.py"
if [ "$1" == "mprof" ]; then
shift
CMD="mprof run $@ ./backend.py"
echo -e "\nRecording memory profile\n"
fi
SERVICE_ARGS="\
--account-file $ACCOUNT_FILE \
--cookie-dir $COOKIE_DIR \
--log $LOG_FILE \
--port "$PORT" \
--host "$HOST" \
--mongo-host $MONGO_HOST \
--mongo-port $MONGO_PORT \
--mongo-db $MONGO_DB \
--mongo-username $MONGO_USERNAME \
--mongo-password $MONGO_PASSWORD \
--twitter-auth-key $TWITTER_AUTH_KEY \
--cors-allow $CORS_ALLOW \
--guest-sessions $GUEST_SESSIONS \
"
if [ -n "$DEBUG" ]; then
SERVICE_ARGS="$SERVICE_ARGS --debug"
fi
CMD="$CMD $SERVICE_ARGS $@"
echo -n "Starting server: "
if [ -n "$DEBUG" ]; then
echo $CMD
else
echo ""
fi
$CMD
#!/usr/bin/env bash
source ./bin/_venv
echo "Installing dependencies..."
pip3 install -r requirements.txt --no-cache-dir
if [ $? -eq 0 ]; then
echo -e "\n----------------------------"
echo -e "Almost done! \\o/\n"
echo "Run 'PYTHON_ENV=[development|prodcution] ./bin/docker-entry.sh .env.example' to start the server!"
echo -e "\nIf you want to make changes to the python packages, e.g. 'pip3 install ...', activate the venv, first: '. .venv/bin/activate'"
fi
if [ "$1" != "" ] && [ -f $1 ]; then
ENV_FILE=$1
else
ENV_FILE=.env
fi
echo 'Stopping service'
pkill -f twitter-auth-key
if [ $? -ne 0 ]; then
echo "Service not running"
fi
if [ "$1" == "-k" ]; then
echo "Deleting logs"
rm ./logs/*
fi
echo 'Starting service'
./docker-entry.sh $ENV_FILE
import copy
import traceback
import sys
from pymongo import MongoClient, errors as MongoErrors, DESCENDING
from time import sleep
from pymongo import MongoClient
from log import log
class Database:
def __init__(self, host=None, port=27017, username='', password='', db='tester'):
def __init__(self, host=None, port=27017, db='tester', username=None, password=None):
# collection name definitions
RESULTS_COLLECTION = 'results'
RATELIMIT_COLLECTION = 'rate-limits'
try:
print('[mongoDB] Connecting to ' + host + ':' + str(port))
print('[mongoDB] Using Database `' + db + '`')
# client and DB
self.client = MongoClient(
host=host,
port=port,
username=username,
password=password,
serverSelectionTimeoutMS=3)
self.db = self.client[db]
# collections
self.results = self.db[RESULTS_COLLECTION]
self.rate_limits = self.db[RATELIMIT_COLLECTION]
# Test connection immediately, instead of
# when trying to write in a request, later.
self.client.admin.command('ismaster')
except MongoErrors.ServerSelectionTimeoutError:
print(traceback.format_exc())
sys.exit('MongoDB connection timed out.')
except:
print(traceback.format_exc())
sys.exit('MongoDB connection failed.')
log.info('Connecting to %s:%d', host, port)
log.info('Using Database `%s`', db)
# client and DB
self.client = MongoClient(host, port, serverSelectionTimeoutMS=3, username=username, password=password)
self.db = self.client[db]
# collections
self.results = self.db[RESULTS_COLLECTION]
self.rate_limits = self.db[RATELIMIT_COLLECTION]
# Test connection immediately, instead of
# when trying to write in a request, later.
self.client.admin.command('ismaster')
def write_result(self, result):
# copy.deepcopy; otherwise mongo ObjectId (_id) would be added,
......@@ -43,6 +32,26 @@ class Database:
def write_rate_limit(self, data):
self.rate_limits.insert_one(data)
def get_result_by_screen_name(self, screen_name):
return self.results.find_one({ "profile.screen_name": screen_name }, sort=[("_id", DESCENDING)], projection={"_id": False})
def close(self):
self.client.close()
def connect(host=None, port=27017, db='tester', username=None, password=None):
if host is None:
raise ValueError('Database constructor needs a `host`name or ip!')
attempt = 0
max_attempts = 7
mongo_client = None
while (mongo_client is None):
try:
mongo_client = Database(host=host, port=port, db=db, username=username, password=password)
except Exception as e:
if attempt is max_attempts:
raise e
sleep(attempt)
attempt += 1
log.warn('Retrying connection, %s/%s', attempt, max_attempts)
return mongo_client
#/usr/bin/env bash
echo "Starting server..."
echo "--account-file $ACCOUNT_FILE"
echo "--cookie-dir $COOKIE_DIR"
echo "--log $LOG_FILE"
echo "--debug $DEBUG_FILE"
echo "--port "$PORT""
echo "--host "$HOST""
echo "--mongo-host $MONGO_HOST"
echo "--mongo-port $MONGO_PORT"
echo "--mongo-db $MONGO_DB"
echo "--twitter-auth-key $TWITTER_AUTH_KEY"
python3 -u ./backend.py \
--account-file $ACCOUNT_FILE \
--cookie-dir $COOKIE_DIR \
--log $LOG_FILE \
--debug $DEBUG_FILE \
--port "$PORT" \
--host "$HOST" \
--mongo-host $MONGO_HOST \
--mongo-port $MONGO_PORT \
--mongo-db $MONGO_DB \
--twitter-auth-key $TWITTER_AUTH_KEY
import sys
# Count amount of "possibly_sensitive_editable" and "possibly_sensitive"
# flagged tweets in user's timeline
async def count_sensitives(session, user_id):
......
#!/usr/bin/env bash
echo -n "Looking for Python3: "
if ! hash python3; then
echo -n "\nPlease install Python3 to use this program!"
fi
echo "OK"
echo "Installing dependencies..."
pip3 install -r requirements.txt --no-cache-dir
echo -e "\n----------------------------"
echo "All done! \o/"
echo "Run 'PYTON_ENV=[development|prodcution] ./run.sh' to start the server!"
import gzip
import logging
from logging.handlers import TimedRotatingFileHandler
import os
import shutil
log_format = '%(asctime)s | %(module)s:%(lineno)d | %(levelname)s: %(message)s'
logging.basicConfig(format=log_format)
log = logging.getLogger(__name__)
def file_namer(name):
return name + ".gz"
def file_rotator(source, dest):
with open(source, "rb") as sf:
with gzip.open(dest, "wb") as df:
shutil.copyfileobj(sf, df)
os.remove(source)
def add_file_handler(filename):
handler = TimedRotatingFileHandler(filename=filename, when='midnight')
handler.setFormatter(logging.Formatter(fmt=log_format))
handler.namer = file_namer
handler.rotator = file_rotator
log.addHandler(handler)
log.info('Writing log to %s', filename)
def set_log_level(level):
level_upper = level.upper()
log.setLevel(getattr(logging, level_upper))
log.info('Log level set to %s', level_upper)
def shutdown_logging():
logging.shutdown()
from tests.typeahead import test as test_typeahead
from tests.ghostban import test as test_ghost_ban
from tests.reply_deboosting import test as test_reply_deboosting
from tests.profile import test as test_profile
__all__ = ['test_typeahead', 'test_ghost_ban', 'test_reply_deboosting', 'test_profile']
import traceback
from log import log
async def test(session, user_id):
try:
tweets_replies = await session.get_profile_tweets_raw(user_id)
tweet_ids = session.get_ordered_tweet_ids(tweets_replies)
replied_ids = []
for tid in tweet_ids:
if tweets_replies["globalObjects"]["tweets"][tid]["reply_count"] > 0 and tweets_replies["globalObjects"]["tweets"][tid]["user_id_str"] == user_id:
replied_ids.append(tid)
for tid in replied_ids:
tweet = await session.tweet_raw(tid)
for reply_id, reply_obj in tweet["globalObjects"]["tweets"].items():
if reply_id == tid or reply_obj.get("in_reply_to_status_id_str", None) != tid:
continue
reply_tweet = await session.tweet_raw(reply_id)
if reply_id not in reply_tweet["globalObjects"]["tweets"]:
continue
obj = {"tweet": tid, "reply": reply_id}
if tid in reply_tweet["globalObjects"]["tweets"]:
obj["ban"] = False
else:
obj["ban"] = True
return obj
except:
log.error('Unexpected Exception:')
log.error(traceback.format_exc())
return { "error": "EUNKNOWN" }
import sys
import pathlib
from typing import Any, Tuple, Dict
sys.path.insert(0, str(pathlib.Path(__file__).parent))
from features import count_sensitives
from log import log
from util import is_error, is_generic_error, UnexpectedApiError
async def test(session, username: str) -> Tuple[str, Dict[str, Any]]:
profile: dict[str, Any] = {}
profile_raw = await session.profile_raw(username)
log.info('Testing ' + str(username))
if is_generic_error(profile_raw, [50, 63]):
log.debug("Other error:" + str(username))
raise UnexpectedApiError
try:
user_id = str(profile_raw["id"])
except KeyError:
user_id = ''
try:
profile["screen_name"] = profile_raw["screen_name"]
except KeyError:
profile["screen_name"] = username
try:
if profile_raw["profile_interstitial_type"] != "":
profile["restriction"] = profile_raw["profile_interstitial_type"]
except KeyError:
pass
try:
profile["protected"] = profile_raw["protected"]
except KeyError:
pass
profile["exists"] = not is_error(profile_raw, 50)
suspended = is_error(profile_raw, 63)
if suspended:
profile["suspended"] = suspended
try:
profile["has_tweets"] = int(profile_raw["statuses_count"]) > 0
except KeyError:
profile["has_tweets"] = False
log.debug(profile)
if profile["exists"] and not profile.get("protected", False) and not profile.get("suspended", False):
profile["sensitives"] = await count_sensitives(session, user_id)
return user_id, profile
import traceback
from log import log
from util import get_nested, get_ordered_tweet_ids
async def test(session, user_id, screen_name):
try:
tweets_replies = await session.get_profile_tweets_raw(user_id)
tweet_ids = get_ordered_tweet_ids(tweets_replies)
reply_tweet_ids = []
for tid in tweet_ids:
tweet = tweets_replies["globalObjects"]["tweets"][tid]
if "in_reply_to_status_id_str" not in tweet or tweet["in_reply_to_user_id_str"] == user_id or tweet["user_id_str"] != user_id:
continue
conversation_tweet = get_nested(tweets_replies, ["globalObjects", "tweets", tweet["conversation_id_str"]])
if conversation_tweet is not None and conversation_tweet.get("user_id_str") == user_id:
continue
reply_tweet_ids.append(tid)
# return error message, when user has not made any reply tweets
if not reply_tweet_ids:
return {"error": "ENOREPLIES"}
for tid in reply_tweet_ids:
replied_to_id = tweets_replies["globalObjects"]["tweets"][tid].get("in_reply_to_status_id_str", None)
if replied_to_id is None:
continue
replied_tweet_obj = await session.tweet_raw(replied_to_id, 50)
if "globalObjects" not in replied_tweet_obj:
continue
if replied_to_id not in replied_tweet_obj["globalObjects"]["tweets"]:
continue
replied_tweet = replied_tweet_obj["globalObjects"]["tweets"][replied_to_id]
if not replied_tweet["conversation_id_str"] in replied_tweet_obj["globalObjects"]["tweets"]:
continue
conversation_tweet = replied_tweet_obj["globalObjects"]["tweets"][replied_tweet["conversation_id_str"]]
if conversation_tweet["user_id_str"] == user_id:
continue
if replied_tweet["reply_count"] > 500:
continue
log.debug('[' + screen_name + '] Barrier Test: ')
log.debug('[' + screen_name + '] Found:' + tid)
log.debug('[' + screen_name + '] In reply to:' + replied_to_id)
if session is None:
log.critical('No reference session')
return
# Importing TwitterSession directly creates circular import
session.__class__.account_index += 1
before_barrier = await session.tweet_raw(replied_to_id, 1000)
if get_nested(before_barrier, ["globalObjects", "tweets"]) is None:
log.error('notweets')
return
if tid in get_ordered_tweet_ids(before_barrier):
return {"ban": False, "tweet": tid, "in_reply_to": replied_to_id}
cursors = ["ShowMoreThreads", "ShowMoreThreadsPrompt"]
last_result = before_barrier
for stage in range(0, 2):
entries = [x for x in last_result["timeline"]["instructions"] if "addEntries" in x][0]["addEntries"]["entries"]
try:
cursor = [x["content"]["operation"]["cursor"]["value"] for x in entries if get_nested(x, ["content", "operation", "cursor", "cursorType"]) == cursors[stage]][0]
except (KeyError, IndexError):
continue
after_barrier = await session.tweet_raw(replied_to_id, 1000, cursor=cursor)
if get_nested(after_barrier, ["globalObjects", "tweets"]) is None:
log.error('retinloop')
return
ids_after_barrier = get_ordered_tweet_ids(after_barrier)
if tid in get_ordered_tweet_ids(after_barrier):
return {"ban": True, "tweet": tid, "stage": stage, "in_reply_to": replied_to_id}
last_result = after_barrier
# happens when replied_to_id tweet has been deleted
log.error('[' + screen_name + '] outer loop return')
return { "error": "EUNKNOWN" }
except:
log.error('Unexpected Exception in test_barrier:')
log.error(traceback.format_exc())
return { "error": "EUNKNOWN" }
from typing import Any
import time
import urllib, urllib.parse
import os
import aiohttp
from bs4 import BeautifulSoup
from yarl import URL
from log import log
from tests import *
from util import get_nested, is_error, UnexpectedApiError
class TwitterSession:
twitter_auth_key = ''
account_sessions = []
account_index = 0
guest_sessions = []
test_index = 0
accounts = []
def __init__(self):
self._guest_token = ''
self._csrf_token = ''
# aiohttp ClientSession
self._session = None
# rate limit monitoring
self.remaining = 180
self.reset = -1
self.locked = False
self.next_refresh = None
# session user's @username
# this stays `None` for guest sessions
self.username = None
self._headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36"
}
# sets self._headers
self.reset_headers()
def set_csrf_header(self):
cookies = self._session.cookie_jar.filter_cookies(URL('https://twitter.com/'))
for _, cookie in cookies.items():
if cookie.key == 'ct0':
self._headers['X-Csrf-Token'] = cookie.value
async def get_guest_token(self):
self._headers['Authorization'] = 'Bearer ' + self.twitter_auth_key
async with self._session.post("https://api.twitter.com/1.1/guest/activate.json", headers=self._headers) as r:
response = await r.json()
guest_token = response.get("guest_token", None)
if guest_token is None:
log.error("Failed to fetch guest token")
log.error(str(response))
log.error(str(self._headers))
return guest_token
def reset_headers(self):
self._headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36"
}
async def renew_session(self):
await self.try_close()
self._session = aiohttp.ClientSession()
self.reset_headers()
async def refresh_old_token(self):
if self.username is not None or self.next_refresh is None or time.time() < self.next_refresh:
return
log.debug("Refreshing token: " + str(self._guest_token))
await self.login_guest()
log.debug("New token: " + str(self._guest_token))
async def try_close(self):
if self._session is not None:
try:
await self._session.close()
except:
pass
async def login_guest(self):
await self.renew_session()
self.set_csrf_header()
old_token = self._guest_token
new_token = await self.get_guest_token()
self._guest_token = new_token if new_token is not None else old_token
if new_token is not None:
self.next_refresh = time.time() + 3600
self._headers['X-Guest-Token'] = self._guest_token
async def login(self, username = None, password = None, email = None, cookie_dir=None):
self._session = aiohttp.ClientSession()
if password is not None:
login_required = True
cookie_file = None
if cookie_dir is not None:
cookie_file = os.path.join(cookie_dir, username)
if os.path.isfile(cookie_file):
log.info("Use cookie file for %s" % username)
# satisfy linter; https://github.com/aio-libs/aiohttp/issues/4043#issuecomment-529085744
assert isinstance(self._session.cookie_jar, aiohttp.CookieJar)
self._session.cookie_jar.load(cookie_file)
login_required = False
store_cookies = True
if login_required:
async with self._session.get("https://twitter.com/login", headers=self._headers) as r:
login_page = await r.text()
form_data = {}
soup = BeautifulSoup(login_page, 'html.parser')
form_data["authenticity_token"] = soup.find('input', {'name': 'authenticity_token'}).get('value')
form_data["session[username_or_email]"] = email
form_data["session[password]"] = password
form_data["remember_me"] = "1"
async with self._session.post('https://twitter.com/sessions', data=form_data, headers=self._headers) as r:
response = await r.text()
if str(r.url) == "https://twitter.com/":
log.info("Login of %s successful" % username)
else:
store_cookies = False
log.info("Error logging in %s (%s)" % (username, r.url))
log.debug("ERROR PAGE\n" + response)
else:
async with self._session.get('https://twitter.com', headers=self._headers) as r:
await r.text()
self.set_csrf_header()
self.username = username
if cookie_file is not None and store_cookies:
# satisfy linter; https://github.com/aio-libs/aiohttp/issues/4043#issuecomment-529085744
assert isinstance(self._session.cookie_jar, aiohttp.CookieJar)
self._session.cookie_jar.save(cookie_file)
else:
await self.login_guest()
self._headers['Authorization'] = 'Bearer ' + self.twitter_auth_key
async def get(self, url, retries=0):
self.set_csrf_header()
await self.refresh_old_token()
try:
async with self._session.get(url, headers=self._headers) as r:
result = await r.json()
except Exception as e:
log.error("EXCEPTION: %s", str(type(e)))
if self.username is None:
await self.login_guest()
raise e
if self.username is None and self.remaining < 10 or is_error(result, 88) or is_error(result, 239):
await self.login_guest()
if retries > 0 and is_error(result, 353):
return await self.get(url, retries - 1)
if is_error(result, 326):
self.locked = True
return result
async def search_raw(self, query, live=True):
additional_query = ""
if live:
additional_query = "&tweet_search_mode=live"
return await self.get("https://api.twitter.com/2/search/adaptive.json?q="+urllib.parse.quote(query)+"&count=20&spelling_corrections=0" + additional_query)
async def profile_raw(self, username):
return await self.get("https://api.twitter.com/1.1/users/show.json?screen_name=" + urllib.parse.quote(username))
async def get_profile_tweets_raw(self, user_id):
return await self.get("https://api.twitter.com/2/timeline/profile/" + str(user_id) +".json?include_tweet_replies=1&include_want_retweets=0&include_reply_count=1&count=1000")
async def tweet_raw(self, tweet_id, count=20, cursor=None):
if cursor is None:
cursor = ""
else:
cursor = "&cursor=" + urllib.parse.quote(cursor)
return await self.get("https://api.twitter.com/2/timeline/conversation/" + tweet_id + ".json?include_reply_count=1&send_error_codes=true&count="+str(count)+ cursor)
async def test(self, username):
result: dict[str, Any] = {"timestamp": time.time()}
user_id, profile = await test_profile(self, username)
result["profile"] = profile
if not profile["exists"] or profile.get("suspended", False) or profile.get("protected", False) or not profile.get('has_tweets'):
return result
result["tests"] = {}
search_raw = await self.search_raw("from:@" + username)
result["tests"]["search"] = False
try:
tweets = search_raw["globalObjects"]["tweets"]
for tweet_id, _ in sorted(tweets.items(), key=lambda t: t[1]["id"], reverse=True):
result["tests"]["search"] = str(tweet_id)
break
except (KeyError, IndexError):
pass
result["tests"]["typeahead"] = await test_typeahead(self, username)
if "search" in result["tests"] and result["tests"]["search"] == False:
result["tests"]["ghost"] = await test_ghost_ban(self, user_id)
else:
result["tests"]["ghost"] = {"ban": False}
if not get_nested(result, ["tests", "ghost", "ban"], False):
result["tests"]["more_replies"] = await test_reply_deboosting(self, user_id, profile['screen_name'])
else:
result["tests"]["more_replies"] = { "error": "EISGHOSTED"}
log.debug('[' + profile['screen_name'] + '] Writing result to DB')
return result
async def close(self):
await self._session.close()
# unused
def next_session():
def key(s):
remaining_time = s.reset - time.time()
if s.remaining <= 3 and remaining_time > 0:
return 900
return remaining_time
sessions = sorted([s for s in TwitterSession.account_sessions if not s.locked], key=key)
if len(sessions) > 0:
return sessions[0]
def get_nested(obj, path, default=None):
for p in path:
if obj is None or not p in obj:
return default
obj = obj[p]
return obj
def is_error(result, code=None):
return isinstance(result.get("errors", None), list) and (len([x for x in result["errors"] if x.get("code", None) == code]) > 0 or code is None and len(result["errors"] > 0))
def is_generic_error(result, codes):
return isinstance(result.get("errors", None), list) and len([x for x in result["errors"] if x.get("code", None) not in codes]) > 0
def flatten_timeline(timeline_items):
result = []
for item in timeline_items:
if get_nested(item, ["content", "item", "content", "tweet", "id"]) is not None:
result.append(item["content"]["item"]["content"]["tweet"]["id"])
elif get_nested(item, ["content", "timelineModule", "items"]) is not None:
timeline_items = item["content"]["timelineModule"]["items"]
titems = [get_nested(x, ["item", "content", "tweet", "id"]) for x in timeline_items]
result += [x for x in titems if x is not None]
return result
def get_ordered_tweet_ids(obj, filtered=True):
try:
entries = [x for x in obj["timeline"]["instructions"] if "addEntries" in x][0]["addEntries"]["entries"]
except (IndexError, KeyError):
return []
entries.sort(key=lambda x: -int(x["sortIndex"]))
flat = flatten_timeline(entries)
return [x for x in flat if not filtered or x in obj["globalObjects"]["tweets"]]
class UnexpectedApiError(Exception):
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment