利用Playwright库进行电影网站数据的获取

lxf2023-05-15 00:56:27

简单概述

本系列可能是一个比较长的系列,主要是对《Python3网络爬虫开发实战》前七章的一个内容总结并且熟悉使用一下相关的框架与技术。

任务目标

爬取电影数据网站ssr1.scrape.center/, 此网站无反爬,数据通过服务端渲染,需要爬取的部分为列表页里面的电影数据详情。

任务目标解析

  1. 爬取ssr1.scrape.center/, 网站的列表页面,通过列表页面的内容获取到需要的URL
  2. 爬取ssr1.scrape.center/detail/{id}, 网站内的数据详情,需要获取的部分有:
    1. 电影标题
    2. 电影图片的url
    3. 电影上映时间
    4. 电影分类
    5. 电影评分
    6. 剧情简介
  3. 将内容存放到需要的数据库中

技术选型与爬取

如何爬取

playwright库是微软开源的一个库,这个库的功能更加的强大,除了可以实现同步操作,同样也可以实现异步的操作,这个库可以说是现在功能最强大的库也不为过,因为其还支持xpath,css选择器等一些元素的选择操作,甚至可以通过点击鼠标进行操作,然后实现自动化构建代码,整体的功能真的十分强大。

构建基础的爬取函数

# 抓取网页内容
def scrape_page(page, url):
    logging.info('scraping %s ...', url)
    try:
        page.goto(url)
        page.wait_for_load_state('networkidle')
    except:
        logging.error('error occured while scraping %s', url, exc_info=True)

对于网页内容的操作,我们只需要对页面选项卡进行操作,传入页面选项卡对象和url链接实现我们想要完成的页面请求,在这种请求下,我们通过等待网络请求的响应情况来判断页面是否完全响应。

构建列表页的爬取函数

这个部分只需要分析出最基础的URL的页码规则就可以完成对页面内容的爬取,经过分析我们可以发现https://ssr1.scrape.center/page/{page}可以发现变动的内容在{page}部分,因此构建的抓取方式如下:

def scrape_index(page, page_index):
    index_url = f'{BASE_URL}/page/{page_index}'
    return scrape_page(page, index_url)

构建详情页的爬取函数

详情页的爬取是建立在解析列表的基础上获得的,因此详情页爬取函数只需要知道url就可以直接调用基础爬取函数,而这里我们只需要对列表页解析后就可以获取到我们所需要的url,因此整体的构建方式如下:

def scrape_detail(page, url):
    return scrape_page(page, url)

如何解析

解析列表页后获取详情页的URL

快速便捷的选择器让我们通过一行代码就获取到我们所需要的标签与属性,十分方便的完成了我们需要获取详情页的URL.

# 获取解析内容
def parse_index(page):
    # 获取网页内容请求
    elements = page.query_selector_all('a.name')
    # 获取元素信息
    for element in elements:
        part_of_url = element.get_attribute('href')
        detail_url = urljoin(BASE_URL, part_of_url)
        logging.info('get url: %s', detail_url)
        yield detail_url

解析详情页获取需要的数据

当详情页数据获取到之后,对网页内的信息进行解析,实现对电影名称,电影类别,图片地址,剧情简介以及评分的内容获取:

def parse_detail(page):
    # 获取标题
    name = None
    name_tag = page.query_selector('h2.m-b-sm')
    if name_tag:
        name = name_tag.text_content()

    # 获取图片
    cover = None
    cover_tag = page.query_selector('img.cover')
    if cover_tag:
        cover = cover_tag.get_attribute('src')

    # 获取分类
    categories = []
    category_tags = page.query_selector_all('div.categories > button > span')
    if category_tags:
        categories = [category.text_content() for category in category_tags]

    # 获取评分
    score = None
    score_tag = page.query_selector('p.score')
    if score_tag:
        score = score_tag.text_content().strip()

    # 剧情简介
    drama = None
    drama_tag = page.query_selector('div.drama > p')
    if drama_tag:
        drama = drama_tag.text_content().strip()

    return {
        # 标题
        'name': name,
        # 图片
        'cover': cover,
        # 分类
        'categories': categories,
        # 简介
        'drama': drama,
        # 评分
        'score': score
    }

如何存储

本次存储使用txt文本进行文件内容的存储,直接将文件内容写入一个txt文件当中。

# 数据内容存储
def save_data(data):
    # 文件存放地址
    data_path = '{0}/movies.txt'.format(RESULT_DIR)
    # 进行文件写入
    with open(data_path, 'a+', encoding='utf-8') as file:
        name = data.get('name', None)
        cover = data.get('cover', None)
        categories = data.get('categories', None)
        drama = data.get('drama', None)
        score = data.get('score', None)

        file.write('name:'+name+'\n')
        file.write('cover:'+cover+'\n')
        file.write('categories:'+str(categories)+'\n')
        file.write('drama:'+drama+'\n')
        file.write('score:'+score+'\n')
        file.write('='*50 + '\n')

源代码

import logging

from os import makedirs
from os.path import exists
from urllib.parse import urljoin
from playwright.sync_api import sync_playwright

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')

#
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULT_DIR = 'results'

exists(RESULT_DIR) or makedirs(RESULT_DIR)

# 抓取网页内容
def scrape_page(page, url):
    logging.info('scraping %s ...', url)
    try:
        page.goto(url)
        page.wait_for_load_state('networkidle')
    except:
        logging.error('error occured while scraping %s', url, exc_info=True)


def scrape_index(page, page_index):
    index_url = f'{BASE_URL}/page/{page_index}'
    return scrape_page(page, index_url)


def scrape_detail(page, url):
    return scrape_page(page, url)

# 获取解析内容
def parse_index(page):
    # 获取网页内容请求
    elements = page.query_selector_all('a.name')
    # 获取元素信息
    for element in elements:
        part_of_url = element.get_attribute('href')
        detail_url = urljoin(BASE_URL, part_of_url)
        logging.info('get url: %s', detail_url)
        yield detail_url


def parse_detail(page):
    # 获取标题
    name = None
    name_tag = page.query_selector('h2.m-b-sm')
    if name_tag:
        name = name_tag.text_content()

    # 获取图片
    cover = None
    cover_tag = page.query_selector('img.cover')
    if cover_tag:
        cover = cover_tag.get_attribute('src')

    # 获取分类
    categories = []
    category_tags = page.query_selector_all('div.categories > button > span')
    if category_tags:
        categories = [category.text_content() for category in category_tags]

    # 获取评分
    score = None
    score_tag = page.query_selector('p.score')
    if score_tag:
        score = score_tag.text_content().strip()

    # 剧情简介
    drama = None
    drama_tag = page.query_selector('div.drama > p')
    if drama_tag:
        drama = drama_tag.text_content().strip()

    return {
        # 标题
        'name': name,
        # 图片
        'cover': cover,
        # 分类
        'categories': categories,
        # 简介
        'drama': drama,
        # 评分
        'score': score
    }

# 数据内容存储
def save_data(data):
    # 文件存放地址
    data_path = '{0}/movies.txt'.format(RESULT_DIR)
    # 进行文件写入
    with open(data_path, 'a+', encoding='utf-8') as file:
        name = data.get('name', None)
        cover = data.get('cover', None)
        categories = data.get('categories', None)
        drama = data.get('drama', None)
        score = data.get('score', None)

        file.write('name:'+name+'\n')
        file.write('cover:'+cover+'\n')
        file.write('categories:'+str(categories)+'\n')
        file.write('drama:'+drama+'\n')
        file.write('score:'+score+'\n')
        file.write('='*50 + '\n')


def main():
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)
        page = browser.new_page()
        for page_index in range(1, TOTAL_PAGE + 1):
            scrape_index(page, page_index)
            detail_urls = list(parse_index(page))
            for detail_url in detail_urls:
                scrape_detail(page, detail_url)
                data = parse_detail(page)
                logging.info('get data: %s', data)
                save_data(data)
            
    browser.close()

if __name__ == '__main__':
    main()

版权信息

本文由PorterZhang整理或写作完成
本人的Github: PorterZhang2021
本人的博客地址:PorterZhang

本网站是一个以CSS、JavaScript、Vue、HTML为核心的前端开发技术网站。我们致力于为广大前端开发者提供专业、全面、实用的前端开发知识和技术支持。 在本网站中,您可以学习到最新的前端开发技术,了解前端开发的最新趋势和最佳实践。我们提供丰富的教程和案例,让您可以快速掌握前端开发的核心技术和流程。 本网站还提供一系列实用的工具和插件,帮助您更加高效地进行前端开发工作。我们提供的工具和插件都经过精心设计和优化,可以帮助您节省时间和精力,提升开发效率。 除此之外,本网站还拥有一个活跃的社区,您可以在社区中与其他前端开发者交流技术、分享经验、解决问题。我们相信,社区的力量可以帮助您更好地成长和进步。 在本网站中,您可以找到您需要的一切前端开发资源,让您成为一名更加优秀的前端开发者。欢迎您加入我们的大家庭,一起探索前端开发的无限可能!