2022年 11月 13日

Python爬取站酷全网用户数据

爬取站酷所有用户资料作品,图片,视频等内容。。。

1.首先我用的是Python3加scrapy,每次爬取一个网站,个人习惯都会对这个网站查看很久,思考怎么写最方便,最通用写这个网站的时候我就发现了,这个网站的每个用户的网址是比较有规律的,为自增长,当然可能中间有一些无效用户吗,但是不影响我们把数据抓全。

2.其次,这个网站理论上来说相对比较简单的,因为没有IP限制,没有js破解,抓取的时候主要考略怎么快就行了!但是有一个要记得,这个网站的视频链接是有有效期的,大家要注意!

3.第一步构造请求,理论上需要从1,构造到17000000左右,为啥事这个,因为当时我新注册额两个,是连的增长的,再加上我之前跑过部分数据观察过其规律,我是开了多个窗口执行代替了分布式。

代码的话我只复制spiders的代码如下,因为比较简单,网站没什么反爬,只需要告诉你怎么抓全,以及视频链接过期问题!这是我爬下来的数据,我们是所以关于这个人的信息全部拿下来了!

表一基本用户信息表:

这是获取用户本身信息,用户信息和作品我是分开跑的,因为作品太多,如果一起层次太深!
# -*- coding: utf-8 -*-
import scrapy
from copy import deepcopy
import re
import redis
from zhanku.items import ZhankuItem


class ZhankuSpider(scrapy.Spider):
    name = 'ZhanKu'
    allowed_domains = ['zcool.com.cn']


    def start_requests(self):
        r = redis.StrictRedis(host="127.0.0.1", port=6379, decode_responses=True)
        while True:
            url = r.spop("zhanku_website")
            dict_data = {}
            yield scrapy.Request(url, meta={"dict_data": dict_data}, callback=self.parse,
                                 dont_filter=True)

    def parse(self, response):
        dict_data = response.meta["dict_data"]
        div_list = response.xpath("//div[@class='work-list-box']/div[@class='card-box']")
        for div in div_list:
            views = div.xpath(".//span[@class='statistics-view']/@title").extract_first()
            try:
                view_num = int(re.search("\d+", views).group())
            except:
                view_num = 0
            link = div.xpath(".//a[@class='title-content']/@href").extract_first()
            dict_data[link] = view_num
        next_page = response.xpath("//div[@class='pageturning ']//a[@class='laypage_next']/@href").extract_first()
        if next_page:
            yield scrapy.Request("https://www.zcool.com.cn" + next_page, meta={"dict_data": dict_data},
                                 callback=self.parse)
        else:
            yield scrapy.Request(response.url,
                             meta={"dict_data": dict_data}, callback=self.get_top10,dont_filter=True)

    def get_top10(self, response):
        data_dict = response.meta["dict_data"]
        if len(data_dict) > 10:
            data_dict = sorted(data_dict.items(), key=lambda x: x[1], reverse=True)[0:10]
            data_dict = {item[0]: item[1] for item in data_dict}
        else:
            data_dict = data_dict
        for key, value in data_dict.items():
            img = []
            video =[]
            yield scrapy.Request(key, meta={"img": img, "video": video,"res":response.url},callback=self.get_zuopin)

    def get_zuopin(self, response):
        item = ZhankuItem()
        img_list =response.meta["img"]
        video_list = response.meta["video"]
        website = response.meta["res"]
        try:
            title = response.xpath("//h2/text()").extract_first()
            if "\n" in title:
                title = title.replace("\n","")
            title = title.strip()
        except:
            title = None
        create_time = re.findall("创建时间:(\d+-\d+-\d+) ", response.text)
        if len(create_time) == 0:
            create_time = None
        else:
            create_time = create_time[0]
        content_tag = ";".join(response.xpath("//span[@class='head-index']/span/a/text()").extract())
        try:
            view_num = response.xpath("//span[@class='head-data-show']//a[@class='see vertical-line']/@title").extract_first()
            view_num = re.search("\d+", view_num).group()
        except:
            view_num = None
        try:
            news = response.xpath(
                "//span[@class='head-data-show']//a[@class='news vertical-line']/@title").extract_first()
            news= re.search("\d+", news).group()
        except:
            news = None
        try:
            recommend = response.xpath(
                "//span[@class='head-data-show']//a[@class='recommend-show']/@title").extract_first()
            recommend = re.search("\d+", recommend).group()
        except:
            recommend = None
        try:
            jianjie = response.xpath("//div[@class='work-decoration-title']/p/text()").extract_first()
        except:
            jianjie = None
        img_list.extend(response.xpath("//div[@class='work-details-content']//img/@src").extract())
        video_list.extend(re.findall("\'(https://video.zcool.cn.*?)\'",str(response.text)))
        #video_list.extend(response.xpath("//div[@class='work-details-content']//video/@src").extract())
        item["title"] = title
        item["view_num"] = view_num
        item["news"] = news
        item["create_time"] = create_time
        item["recommend"] = recommend
        item["content_tags"] = content_tag
        item["jianjie"] = jianjie
        item["img_list"] = img_list
        item["video"] = video_list
        item["website"] = website
        next_page = response.xpath("//a[@class='laypage_next']/@href").extract_first()
        if next_page:
            yield scrapy.Request("https://www.zcool.com.cn" + next_page,meta={"img": img_list, "video": video_list,"res":website},callback=self.get_zuopin)
        else:
            yield item

表二:用户作品表:

这是爬取图片主题,链接,视频的代码!

# -*- coding: utf-8 -*-
import scrapy
from copy import deepcopy
import re
import redis
from zhanku.items import ZhankuItem


class ZhankuSpider(scrapy.Spider):
    name = 'ZhanKu'
    allowed_domains = ['zcool.com.cn']


    def start_requests(self):
        r = redis.StrictRedis(host="127.0.0.1", port=6379, decode_responses=True)
        while True:
            url = r.spop("zhanku_website")
            dict_data = {}
            yield scrapy.Request(url, meta={"dict_data": dict_data}, callback=self.parse,
                                 dont_filter=True)

    def parse(self, response):
        dict_data = response.meta["dict_data"]
        div_list = response.xpath("//div[@class='work-list-box']/div[@class='card-box']")
        for div in div_list:
            views = div.xpath(".//span[@class='statistics-view']/@title").extract_first()
            try:
                view_num = int(re.search("\d+", views).group())
            except:
                view_num = 0
            link = div.xpath(".//a[@class='title-content']/@href").extract_first()
            dict_data[link] = view_num
        next_page = response.xpath("//div[@class='pageturning ']//a[@class='laypage_next']/@href").extract_first()
        if next_page:
            yield scrapy.Request("https://www.zcool.com.cn" + next_page, meta={"dict_data": dict_data},
                                 callback=self.parse)
        else:
            yield scrapy.Request(response.url,
                             meta={"dict_data": dict_data}, callback=self.get_top10,dont_filter=True)

    def get_top10(self, response):
        data_dict = response.meta["dict_data"]
        if len(data_dict) > 10:
            data_dict = sorted(data_dict.items(), key=lambda x: x[1], reverse=True)[0:10]
            data_dict = {item[0]: item[1] for item in data_dict}
        else:
            data_dict = data_dict
        for key, value in data_dict.items():
            img = []
            video =[]
            yield scrapy.Request(key, meta={"img": img, "video": video,"res":response.url},callback=self.get_zuopin)

    def get_zuopin(self, response):
        item = ZhankuItem()
        img_list =response.meta["img"]
        video_list = response.meta["video"]
        website = response.meta["res"]
        try:
            title = response.xpath("//h2/text()").extract_first()
            if "\n" in title:
                title = title.replace("\n","")
            title = title.strip()
        except:
            title = None
        create_time = re.findall("创建时间:(\d+-\d+-\d+) ", response.text)
        if len(create_time) == 0:
            create_time = None
        else:
            create_time = create_time[0]
        content_tag = ";".join(response.xpath("//span[@class='head-index']/span/a/text()").extract())
        try:
            view_num = response.xpath("//span[@class='head-data-show']//a[@class='see vertical-line']/@title").extract_first()
            view_num = re.search("\d+", view_num).group()
        except:
            view_num = None
        try:
            news = response.xpath(
                "//span[@class='head-data-show']//a[@class='news vertical-line']/@title").extract_first()
            news= re.search("\d+", news).group()
        except:
            news = None
        try:
            recommend = response.xpath(
                "//span[@class='head-data-show']//a[@class='recommend-show']/@title").extract_first()
            recommend = re.search("\d+", recommend).group()
        except:
            recommend = None
        try:
            jianjie = response.xpath("//div[@class='work-decoration-title']/p/text()").extract_first()
        except:
            jianjie = None
        img_list.extend(response.xpath("//div[@class='work-details-content']//img/@src").extract())
        video_list.extend(re.findall("\'(https://video.zcool.cn.*?)\'",str(response.text)))
        #video_list.extend(response.xpath("//div[@class='work-details-content']//video/@src").extract())
        item["title"] = title
        item["view_num"] = view_num
        item["news"] = news
        item["create_time"] = create_time
        item["recommend"] = recommend
        item["content_tags"] = content_tag
        item["jianjie"] = jianjie
        item["img_list"] = img_list
        item["video"] = video_list
        item["website"] = website
        next_page = response.xpath("//a[@class='laypage_next']/@href").extract_first()
        if next_page:
            yield scrapy.Request("https://www.zcool.com.cn" + next_page,meta={"img": img_list, "video": video_list,"res":website},callback=self.get_zuopin)
        else:
            yield item