Skip to content

Commit

Permalink
Gathering spam (hidden requests) inbox. Docs
Browse files Browse the repository at this point in the history
  • Loading branch information
KolyaDobrydnev committed Aug 21, 2023
1 parent a9d98ca commit 8bad151
Showing 1 changed file with 63 additions and 4 deletions.
67 changes: 63 additions & 4 deletions instagrapi/mixins/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ def direct_threads_chunk(

def direct_pending_inbox(self, amount: int = 20) -> List[DirectThread]:
"""
Get direct message pending threads
Get direct threads of Pending inbox
Parameters
----------
amount: int, optional
Maximum number of media to return, default is 20
Maximum number of threads to return, default is 20
Returns
-------
Expand All @@ -166,7 +166,6 @@ def direct_pending_inbox(self, amount: int = 20) -> List[DirectThread]:

cursor = None
threads = []
# self.private_request("direct_v2/get_presence/")
while True:
new_threads, cursor = self.direct_pending_chunk(cursor)
for thread in new_threads:
Expand All @@ -182,7 +181,7 @@ def direct_pending_chunk(
self, cursor: str = None
) -> Tuple[List[DirectThread], str]:
"""
Get direct message pending threads
Get direct threads of Pending inbox. Chunk
Parameters
----------
Expand Down Expand Up @@ -235,6 +234,66 @@ def direct_pending_approve(self, thread_id: int) -> bool:
)
return result.get("status", "") == "ok"

def direct_spam_inbox(self, amount: int = 20) -> List[DirectThread]:
"""
Get direct threads of Spam inbox (hidden requests)
Parameters
----------
amount: int, optional
Maximum number of threads to return, default is 20
Returns
-------
List[DirectThread]
A list of objects of DirectThread
"""
cursor = None
threads = []
while True:
new_threads, cursor = self.direct_spam_chunk(cursor)
for thread in new_threads:
threads.append(thread)

if not cursor or (amount and len(threads) >= amount):
break
if amount:
threads = threads[:amount]
return threads

def direct_spam_chunk(
self, cursor: str = None
) -> Tuple[List[DirectThread], str]:
"""
Get direct threads of Spam inbox (hidden requests). Chunk
Parameters
----------
cursor: str, optional
Cursor from the previous chunk request
Returns
-------
Tuple[List[DirectThread], str]
A tuple of list of objects of DirectThread and str (cursor)
"""
assert self.user_id, "Login required"
params = {
"visual_message_return_type": "unseen",
"persistentBadging": "true",
"is_prefetching": "false",
}
if cursor:
params.update({"cursor": cursor})

threads = []
result = self.private_request("direct_v2/spam_inbox/", params=params)
inbox = result.get("inbox", {})
for thread in inbox.get("threads", []):
threads.append(extract_direct_thread(thread))
cursor = inbox.get("oldest_cursor")
return threads, cursor

def direct_thread(self, thread_id: int, amount: int = 20) -> DirectThread:
"""
Get all the information about a Direct Message thread
Expand Down

0 comments on commit 8bad151

Please sign in to comment.