1.站酷數據獲取
其中需要注意,本文使用了IP代理,以及不同的作品類型,他們詳情頁url拼接方式不同
import random import time import datetime import requests import threading from lxml import etree import pymysql class ZhankunSpider(object): def __init__(self): self.url = 'https://www.zcool.com.cn/p1/discover/first?p={}&ps=20' self.mysql = pymysql.connect(host='localhost', database='tenders', port=3306, user='root', password='123456') self.cur = self.mysql.cursor() self.blog = 1 def proxy_get(self): procy = requests.get( r'放入IP代理鍊接').json()['data'][0] proxy = str(procy["ip"]) + ':' + str(procy["port"]) http = 'http://' + proxy https = 'https://' + proxy self.proxys = {'http': http, 'https': https} print(self.proxys) # result = requests.get('https://www.baidu.com/',verify=False) result = requests.get('https://www.baidu.com/') print(result.status_code) if result.status_code != 200: self.proxy_get() time.sleep(0.2) return # self.expire_datetime = datetime.datetime.now() + datetime.timedelta(seconds=60) def _check_expire(self): self.expire_datetime = datetime.datetime.now() + datetime.timedelta(seconds=60) if datetime.datetime.now() >= self.expire_datetime: self.proxy_get() # 發送請求 def get_html(self, url): if self.blog <= 3: try: datas = { 'p': 'i', 'column': 5 } headers = {'Cookie': '登陸後cookie', 'User-Agent':'',} json_ids = requests.get(url=url, headers=headers, data=datas).json() return json_ids except Exception as e: print(e) self.blog += 1 self.get_html(url) # 解析提取數據 def parse_html(self, url): json_ids = self.get_html(url) self._check_expire() if json_ids: time.sleep(1) for dic in json_ids['datas']: titles = dic['content']['title'] #題目 types = dic['content']['typeStr'] viewCountStrs = dic['content']['viewCountStr'] #浏覽量 subCateStrs = dic['content']['subCateStr'] cateStrs = dic['content']['cateStr'] url13 = 'https://www.zcool.com.cn/p1/product/'+dic['content']['idStr'] urll = dic['content']['pageUrl'] headers1 = { 'Cookie': '', 'User-Agent': '', } # self._check_expire() if 'work' in urll: url2 = 'https://www.zcool.com.cn/p1/product/' + dic['content']['idStr'] try: json_idss = requests.get(url=url2, headers=headers1, proxies=self.proxys, timeout=3).json() except: self.proxy_get() json_idss = requests.get(url=url2, headers=headers1, proxies=self.proxys, timeout=3).json() time.sleep(1) for dici in json_idss['data']['productImages']: datass = dici['url'] else: url2 = 'https://www.zcool.com.cn/p1/article/' + dic['content']['idStr'] try: json_idss = requests.get(url=url2, headers=headers1, proxies=self.proxys, timeout=3).json() except: self.proxy_get() json_idss = requests.get(url=url2, headers=headers1, proxies=self.proxys, timeout=3).json() time.sleep(1) # datass = json_idss['data']['id'] for dici in json_idss['data']['creatorObj']['contentCards']: datass = dici['cover1x'] timeStamp = dic['content']['timeTitleStr'] # timeArray = time.localtime(timeStamp) # 轉化成對應的時間 # otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray) # 字符串 # date = otherStyleTime photo = dic['content']['cover1x'] data = { 'title': titles, 'urls': url13, 'address': timeStamp, 'configuration': types, 'grade': viewCountStrs, 'collections': subCateStrs, 'price': cateStrs, 'unit': photo, 'photoadress': datass } print(data) self.save_mysql(data) def save_mysql(self, data): # str_sql = "insert into ftx values(0, '{}', '{}');".format(data['first_category'],data['second_category']) str_sql = "insert into meituan values(0, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}');".format( data['title'], data['urls'],data['address'], data['configuration'], data['grade'], data['collections'], data['price'], data['unit'], data['photoadress']) self.cur.execute(str_sql) self.mysql.commit() def __del__(self): self.cur.close() self.mysql.close() # 入口函數 def run(self): try: for i in range(1,5): url = self.url.format(i) print(i) # self.get_html(url) self.parse_html(url) time.sleep(random.randint(2, 4)) # 每次抓取一頁要初始化一次self.blog self.blog = 1 except Exception as e: print('發生錯誤', e) if __name__ == '__main__': spider = ZhankunSpider() spider.run()
2.結果展示
有話要說...