Commit 04006c7a authored by Raphael Beer's avatar Raphael Beer

Refactor: extract testing logics to subpackage 'test' +

  Add your test as /tests/fooban.py and
  amend /tests/__init__.py

    ```python
    from tests.fooban import test as test_fooban
    __all__ = ['test_typeahead', ..., 'test_fooban']
    ```

  so you can access the test when importing the module

    ```python
    from tests import *
    [...]
    result = await test_fooban()
    ```
parent f927ed09
...@@ -5,7 +5,6 @@ import daemon ...@@ -5,7 +5,6 @@ import daemon
import json import json
import os import os
import re import re
import traceback
import urllib.parse import urllib.parse
import sys import sys
import time import time
......
from tests.typeahead import test as test_typeahead
from tests.ghostban import test as test_ghost_ban
from tests.reply_deboosting import test as test_reply_deboosting
__all__ = ['test_typeahead', 'test_ghost_ban', 'test_reply_deboosting']
from log import log
async def test(session, user_id):
try:
tweets_replies = await session.get_profile_tweets_raw(user_id)
tweet_ids = session.get_ordered_tweet_ids(tweets_replies)
replied_ids = []
for tid in tweet_ids:
if tweets_replies["globalObjects"]["tweets"][tid]["reply_count"] > 0 and tweets_replies["globalObjects"]["tweets"][tid]["user_id_str"] == user_id:
replied_ids.append(tid)
for tid in replied_ids:
tweet = await session.tweet_raw(tid)
for reply_id, reply_obj in tweet["globalObjects"]["tweets"].items():
if reply_id == tid or reply_obj.get("in_reply_to_status_id_str", None) != tid:
continue
reply_tweet = await session.tweet_raw(reply_id)
if reply_id not in reply_tweet["globalObjects"]["tweets"]:
continue
obj = {"tweet": tid, "reply": reply_id}
if tid in reply_tweet["globalObjects"]["tweets"]:
obj["ban"] = False
else:
obj["ban"] = True
return obj
except:
log.error('Unexpected Exception:')
log.error(traceback.format_exc())
return { "error": "EUNKNOWN" }
import traceback
from log import log
from util import get_nested
async def test(session, user_id, screen_name):
try:
tweets_replies = await session.get_profile_tweets_raw(user_id)
tweet_ids = session.get_ordered_tweet_ids(tweets_replies)
reply_tweet_ids = []
for tid in tweet_ids:
if "in_reply_to_status_id_str" not in tweets_replies["globalObjects"]["tweets"][tid] or tweets_replies["globalObjects"]["tweets"][tid]["user_id_str"] != user_id:
continue
tweet = tweets_replies["globalObjects"]["tweets"][tid]
conversation_tweet = get_nested(tweets_replies, ["globalObjects", "tweets", tweet["conversation_id_str"]])
if conversation_tweet is not None and conversation_tweet.get("user_id_str") == user_id:
continue
reply_tweet_ids.append(tid)
# return error message, when user has not made any reply tweets
if not reply_tweet_ids:
return {"error": "ENOREPLIES"}
for tid in reply_tweet_ids:
replied_to_id = tweets_replies["globalObjects"]["tweets"][tid].get("in_reply_to_status_id_str", None)
if replied_to_id is None:
continue
replied_tweet_obj = await session.tweet_raw(replied_to_id, 50)
if "globalObjects" not in replied_tweet_obj:
continue
if replied_to_id not in replied_tweet_obj["globalObjects"]["tweets"]:
continue
replied_tweet = replied_tweet_obj["globalObjects"]["tweets"][replied_to_id]
if not replied_tweet["conversation_id_str"] in replied_tweet_obj["globalObjects"]["tweets"]:
continue
conversation_tweet = replied_tweet_obj["globalObjects"]["tweets"][replied_tweet["conversation_id_str"]]
if conversation_tweet["user_id_str"] == user_id:
continue
if replied_tweet["reply_count"] > 500:
continue
log.debug('[' + screen_name + '] Barrier Test: ')
log.debug('[' + screen_name + '] Found:' + tid)
log.debug('[' + screen_name + '] In reply to:' + replied_to_id)
if session is None:
log.critical('No reference session')
return
# Importing TwitterSession directly creates circular import
session.__class__.account_index += 1
before_barrier = await session.tweet_raw(replied_to_id, 1000)
if get_nested(before_barrier, ["globalObjects", "tweets"]) is None:
log.error('notweets')
return
if tid in session.get_ordered_tweet_ids(before_barrier):
return {"ban": False, "tweet": tid, "in_reply_to": replied_to_id}
cursors = ["ShowMoreThreads", "ShowMoreThreadsPrompt"]
last_result = before_barrier
for stage in range(0, 2):
entries = [x for x in last_result["timeline"]["instructions"] if "addEntries" in x][0]["addEntries"]["entries"]
try:
cursor = [x["content"]["operation"]["cursor"]["value"] for x in entries if get_nested(x, ["content", "operation", "cursor", "cursorType"]) == cursors[stage]][0]
except (KeyError, IndexError):
continue
after_barrier = await session.tweet_raw(replied_to_id, 1000, cursor=cursor)
if get_nested(after_barrier, ["globalObjects", "tweets"]) is None:
log.error('retinloop')
return
ids_after_barrier = session.get_ordered_tweet_ids(after_barrier)
if tid in session.get_ordered_tweet_ids(after_barrier):
return {"ban": True, "tweet": tid, "stage": stage, "in_reply_to": replied_to_id}
last_result = after_barrier
# happens when replied_to_id tweet has been deleted
log.error('[' + screen_name + '] outer loop return')
return { "error": "EUNKNOWN" }
except:
log.error('Unexpected Exception in test_barrier:')
log.error(traceback.format_exc())
return { "error": "EUNKNOWN" }
...@@ -4,7 +4,13 @@ import urllib ...@@ -4,7 +4,13 @@ import urllib
from log import log from log import log
from statistics import count_sensitives from statistics import count_sensitives
from typeahead import test as test_typeahead
from util import get_nested
from tests import *
# from ghostban import test as test_ghost_ban
# from reply_deboosting import test as test_reply_deboosting
# from typeahead import test as test_typeahead
class UnexpectedApiError(Exception): class UnexpectedApiError(Exception):
pass pass
...@@ -144,7 +150,7 @@ class TwitterSession: ...@@ -144,7 +150,7 @@ class TwitterSession:
async with self._session.get(url, headers=self._headers) as r: async with self._session.get(url, headers=self._headers) as r:
result = await r.json() result = await r.json()
except Exception as e: except Exception as e:
log.debug("EXCEPTION: " + str(type(e))) log.error("EXCEPTION: %s", str(type(e)))
if self.username is None: if self.username is None:
await self.login_guest() await self.login_guest()
raise e raise e
...@@ -197,121 +203,6 @@ class TwitterSession: ...@@ -197,121 +203,6 @@ class TwitterSession:
flat = cls.flatten_timeline(entries) flat = cls.flatten_timeline(entries)
return [x for x in flat if not filtered or x in obj["globalObjects"]["tweets"]] return [x for x in flat if not filtered or x in obj["globalObjects"]["tweets"]]
async def test_ghost_ban(self, user_id):
try:
tweets_replies = await self.get_profile_tweets_raw(user_id)
tweet_ids = self.get_ordered_tweet_ids(tweets_replies)
replied_ids = []
for tid in tweet_ids:
if tweets_replies["globalObjects"]["tweets"][tid]["reply_count"] > 0 and tweets_replies["globalObjects"]["tweets"][tid]["user_id_str"] == user_id:
replied_ids.append(tid)
for tid in replied_ids:
tweet = await self.tweet_raw(tid)
for reply_id, reply_obj in tweet["globalObjects"]["tweets"].items():
if reply_id == tid or reply_obj.get("in_reply_to_status_id_str", None) != tid:
continue
reply_tweet = await self.tweet_raw(reply_id)
if reply_id not in reply_tweet["globalObjects"]["tweets"]:
continue
obj = {"tweet": tid, "reply": reply_id}
if tid in reply_tweet["globalObjects"]["tweets"]:
obj["ban"] = False
else:
obj["ban"] = True
return obj
except:
log.debug('Unexpected Exception:')
log.debug(traceback.format_exc())
return { "error": "EUNKNOWN" }
async def test_barrier(self, user_id, screen_name):
try:
tweets_replies = await self.get_profile_tweets_raw(user_id)
tweet_ids = self.get_ordered_tweet_ids(tweets_replies)
reply_tweet_ids = []
for tid in tweet_ids:
if "in_reply_to_status_id_str" not in tweets_replies["globalObjects"]["tweets"][tid] or tweets_replies["globalObjects"]["tweets"][tid]["user_id_str"] != user_id:
continue
tweet = tweets_replies["globalObjects"]["tweets"][tid]
conversation_tweet = get_nested(tweets_replies, ["globalObjects", "tweets", tweet["conversation_id_str"]])
if conversation_tweet is not None and conversation_tweet.get("user_id_str") == user_id:
continue
reply_tweet_ids.append(tid)
# return error message, when user has not made any reply tweets
if not reply_tweet_ids:
return {"error": "ENOREPLIES"}
for tid in reply_tweet_ids:
replied_to_id = tweets_replies["globalObjects"]["tweets"][tid].get("in_reply_to_status_id_str", None)
if replied_to_id is None:
continue
replied_tweet_obj = await self.tweet_raw(replied_to_id, 50)
if "globalObjects" not in replied_tweet_obj:
continue
if replied_to_id not in replied_tweet_obj["globalObjects"]["tweets"]:
continue
replied_tweet = replied_tweet_obj["globalObjects"]["tweets"][replied_to_id]
if not replied_tweet["conversation_id_str"] in replied_tweet_obj["globalObjects"]["tweets"]:
continue
conversation_tweet = replied_tweet_obj["globalObjects"]["tweets"][replied_tweet["conversation_id_str"]]
if conversation_tweet["user_id_str"] == user_id:
continue
if replied_tweet["reply_count"] > 500:
continue
log.debug('[' + screen_name + '] Barrier Test: ')
log.debug('[' + screen_name + '] Found:' + tid)
log.debug('[' + screen_name + '] In reply to:' + replied_to_id)
reference_session = next_session()
reference_session = self
if reference_session is None:
log.debug('No reference session')
return
TwitterSession.account_index += 1
before_barrier = await reference_session.tweet_raw(replied_to_id, 1000)
if get_nested(before_barrier, ["globalObjects", "tweets"]) is None:
log.debug('notweets\n')
return
if tid in self.get_ordered_tweet_ids(before_barrier):
return {"ban": False, "tweet": tid, "in_reply_to": replied_to_id}
cursors = ["ShowMoreThreads", "ShowMoreThreadsPrompt"]
last_result = before_barrier
for stage in range(0, 2):
entries = [x for x in last_result["timeline"]["instructions"] if "addEntries" in x][0]["addEntries"]["entries"]
try:
cursor = [x["content"]["operation"]["cursor"]["value"] for x in entries if get_nested(x, ["content", "operation", "cursor", "cursorType"]) == cursors[stage]][0]
except (KeyError, IndexError):
continue
after_barrier = await reference_session.tweet_raw(replied_to_id, 1000, cursor=cursor)
if get_nested(after_barrier, ["globalObjects", "tweets"]) is None:
log.debug('retinloop\n')
return
ids_after_barrier = self.get_ordered_tweet_ids(after_barrier)
if tid in self.get_ordered_tweet_ids(after_barrier):
return {"ban": True, "tweet": tid, "stage": stage, "in_reply_to": replied_to_id}
last_result = after_barrier
# happens when replied_to_id tweet has been deleted
log.debug('[' + screen_name + '] outer loop return')
return { "error": "EUNKNOWN" }
except:
log.debug('Unexpected Exception in test_barrier:\n')
log.debug(traceback.format_exc())
return { "error": "EUNKNOWN" }
async def test(self, username): async def test(self, username):
result = {"timestamp": time.time()} result = {"timestamp": time.time()}
profile = {} profile = {}
...@@ -373,12 +264,12 @@ class TwitterSession: ...@@ -373,12 +264,12 @@ class TwitterSession:
result["tests"]["typeahead"] = await test_typeahead(self, username) result["tests"]["typeahead"] = await test_typeahead(self, username)
if "search" in result["tests"] and result["tests"]["search"] == False: if "search" in result["tests"] and result["tests"]["search"] == False:
result["tests"]["ghost"] = await self.test_ghost_ban(user_id) result["tests"]["ghost"] = await test_ghost_ban(self, user_id)
else: else:
result["tests"]["ghost"] = {"ban": False} result["tests"]["ghost"] = {"ban": False}
if not get_nested(result, ["tests", "ghost", "ban"], False): if not get_nested(result, ["tests", "ghost", "ban"], False):
result["tests"]["more_replies"] = await self.test_barrier(user_id, profile['screen_name']) result["tests"]["more_replies"] = await test_reply_deboosting(self, user_id, profile['screen_name'])
else: else:
result["tests"]["more_replies"] = { "error": "EISGHOSTED"} result["tests"]["more_replies"] = { "error": "EISGHOSTED"}
...@@ -389,22 +280,16 @@ class TwitterSession: ...@@ -389,22 +280,16 @@ class TwitterSession:
async def close(self): async def close(self):
await self._session.close() await self._session.close()
def next_session(): @classmethod
def key(s): def next_session():
remaining_time = s.reset - time.time() def key(s):
if s.remaining <= 3 and remaining_time > 0: remaining_time = s.reset - time.time()
return 900 if s.remaining <= 3 and remaining_time > 0:
return remaining_time return 900
sessions = sorted([s for s in TwitterSession.account_sessions if not s.locked], key=key) return remaining_time
if len(sessions) > 0: sessions = sorted([s for s in TwitterSession.account_sessions if not s.locked], key=key)
return sessions[0] if len(sessions) > 0:
return sessions[0]
def get_nested(obj, path, default=None):
for p in path:
if obj is None or not p in obj:
return default
obj = obj[p]
return obj
def is_error(result, code=None): def is_error(result, code=None):
return isinstance(result.get("errors", None), list) and (len([x for x in result["errors"] if x.get("code", None) == code]) > 0 or code is None and len(result["errors"] > 0)) return isinstance(result.get("errors", None), list) and (len([x for x in result["errors"] if x.get("code", None) == code]) > 0 or code is None and len(result["errors"] > 0))
......
def get_nested(obj, path, default=None):
for p in path:
if obj is None or not p in obj:
return default
obj = obj[p]
return obj
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment