preface
For no reason, I just thought there were so many girls on Instagram…
The body of the
A, analysis,
1. Analyze target websites
First analysis of the website picture loading process, Taeri__taeri should be known this network celebrity. Instagram photos only load a certain number of photos at a time, scroll down and load again, no doubt looking at XHR
You can see the JSON data in the preview bar, and the display_url is the link to the photo, and that’s all you need to get
2. Analyze the request parameters
Go back to headers and see what parameters the request took; Just two, quer_hash and variables
Variables is a JSON that contains ID, first, and after. For the sake of no trouble. I’ll just tell you what these three are, if you’re interested
Id: User ID indicates the ID of a user
First: This request loads the number of photos
After: End cursor this parameter is used to determine the previous page. Without this parameter, the first page will always be loaded. This page will have an end cursor parameter for the next page request
One more thing, I need to add cookies, so I don’t have to say much about this
3. Procedure flow
Now that I’ve analyzed all the requests, I’m going to analyze how to write the program. What’s my requirement now
Given a user name, grab all of that user’s photos and download them to instagram// as soon as possible
Requirements said need as soon as possible, then single thread even, too inefficient, a common network red photos at least there are hundreds of; So asyncio is used here, and the flow looks like this
The first task is to download the image link and then to get the image link. While I’m waiting to get the image link, I’ll do the download task first. While WAITING to get the image link, I’ll get the download link.
Second, the code
Lib:
import json
import multiprocessing
import sys
from urllib.parse import urljoin
import aiohttp
import asyncio
import os
import re
from pathlib import Path
import requests
Copy the code
__init__:
def __init__(self, username, maxtasks=200):
self.username = username
self.maxtasks = maxtasks # Maximum number of tasks
self.queue = asyncio.Queue(maxsize=maxtasks * 2)
# configure proxy, no scientific access to Instagram
os.environ['http_proxy'] = PROXY
os.environ['https_proxy'] = PROXY
self.session = aiohttp.ClientSession(trust_env=True, headers=HEADERS)
Copy the code
Get the user ID first:
async def get_shared_data(self):
""" Get shared data :return: """
try:
async with self.session.get(ROOT_URL + self.username) as resp:
html = await resp.text()
if html is not None and '_sharedData' in html:
shared_data = html.split("window._sharedData = ") [1].split(
"; ") [0]
if not shared_data: No shared data can be used to terminate the program
print('!!!!!!!!!!!!!!!! ')
exit(1)
return json.loads(shared_data)
except Exception:
pass
async def init(self):
""" Initialize the necessary parameters :return: """
user = (await self.get_shared_data())['entry_data'] ['ProfilePage'] [0] ['graphql'] ['user']
if not user:
print('user is none.')
exit(1)
self.user_id = user['id'] # user id
self.count = user['edge_owner_to_timeline_media'] ['count'] # Number of photos
Copy the code
Producers:
async def produce_download_urls(self, max=50):
""" Get links to all photos on each page :param Max: Get number of photos at once :return: """
end_cursor = ' ' Load the first page
while True:
pic_params = {
'query_hash':
'f2405b236d85e8296cf30347c9f08c2a'.# query_hash can fix a value
'variables':
'{{"id":"{0}","first":{1},"after":"{2}"}}'.format(
self.user_id, max, end_cursor),
}
pic_url = ROOT_URL + 'graphql/query/'
async with self.session.get(pic_url, params=pic_params) as resp:
json = await resp.json()
edge_media = json['data'] ['user'] ['edge_owner_to_timeline_media']
edges = edge_media['edges']
if edges:
for edge in edges:
await self.queue.put(edge['node'] ['display_url']) # queue communication
has_next_page = edge_media['page_info'] ['has_next_page'] # json has a has Next page item with a value of true or false to determine whether there is a next page
if has_next_page:
end_cursor = edge_media['page_info'] ['end_cursor'] Get end cursor
else:
break
Copy the code
Consumer:
async def download(self):
""" Download photo :return: """
while not (self.producer.done() and self.queue.empty()): # if the production task is not completed and the queue is not empty
url = await self.queue.get() # Get the photo link
filename = PATH / url.split('? ') [0].split('/') [- 1]
async with self.session.get(url) as resp:
with filename.open('wb') as f:
async for chunk in resp.content.iter_any():
f.write(chunk)
self.queue.task_done() The url of the photo that was queued has been downloaded.
print('. ', end=' ', flush=True)
Copy the code
Run:
async def run(self):
""" :return: """
print('Preparing... ')
print('Initializing... ')
await self.init()
print('User id: %r.' % self.user_id)
print('Total %r photos.' % self.count)
print(The '-'*50)
self.producer = asyncio.create_task(self.produce_download_urls())
print('Downloading... ', end=' ', flush=True)
await asyncio.gather(*(self.download() for _ in range(self.maxtasks))) # Asyncio. gather is similar to Asyncio. wait
Copy the code
Check:
def check(_):
""" Detect photo count... Too vegetables don't know how to stop can only so (escape; "" "
print('Start check... ')
with requests.get(urljoin(ROOT_URL, USERNAME), headers=HEADERS,
proxies={'http': 'http://localhost:80001'.'https': 'https://localhost:8001'}) as resp:
pattern = '"edge_owner_to_timeline_media":.? {"count":(.*?) ,"page_info"'
count = int(re.findall(pattern, resp.text)[0])
while True:
files = len(os.listdir(PATH))
print('Check files:%r' % files)
if files == count:
# print('Total %r photos download done.' % count)
print('\nProduce done, Total %r photos, plz wait save done :)' % count)
sys.exit(0)
Copy the code
Main:
async def main(a):
ins = Instagram(USERNAME)
try:
await ins.run()
finally:
await ins.close()
Copy the code
if __name__ == ‘__main__’:
if __name__ == '__main__':
try:
p = multiprocessing.Process(target=check, args=(0,))
p.start()
future = asyncio.ensure_future(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(future)
loop.close()
except KeyboardInterrupt:
pass
Copy the code
Run:
The last
Project address 🙂