跳至主要內容

Python Web Scraping 實戰:從基礎到反爬蟲對策

Python Web Scraping 實戰:從基礎到反爬蟲對策

網頁爬蟲是資料工程師的必備技能。Python 有完整的爬蟲生態系,從簡單的靜態頁面到複雜的 SPA 應用都能應付。這篇文章從基礎開始,涵蓋實際開發中遇到的各種問題和解決方法。

免責聲明:爬蟲使用前請確認目標網站的 robots.txt 和服務條款,不要造成對方伺服器的負擔,並遵守相關法規。

環境安裝

pip install requests beautifulsoup4 lxml httpx playwright

基礎:requests + BeautifulSoup

import requests
from bs4 import BeautifulSoup
import time

def scrape_news(url: str) -> list[dict]:
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }
    
    response = requests.get(url, headers=headers, timeout=10)
    response.raise_for_status()
    response.encoding = 'utf-8'
    
    soup = BeautifulSoup(response.text, 'lxml')
    articles = []
    
    for item in soup.select('.news-item'):
        title_el = item.select_one('h2.title')
        link_el = item.select_one('a')
        date_el = item.select_one('.date')
        
        if not title_el or not link_el:
            continue
            
        articles.append({
            'title': title_el.get_text(strip=True),
            'url': link_el.get('href', ''),
            'date': date_el.get_text(strip=True) if date_el else None,
        })
    
    return articles

分頁爬取

import time
from typing import Generator

def scrape_all_pages(base_url: str, max_pages: int = 10) -> Generator:
    for page in range(1, max_pages + 1):
        url = f"{base_url}?page={page}"
        
        try:
            items = scrape_page(url)
            if not items:  # 沒有更多資料
                print(f"第 {page} 頁無資料,停止爬取")
                break
                
            yield from items
            
            # 避免過於頻繁的請求
            time.sleep(1.5)
            
        except requests.RequestException as e:
            print(f"第 {page} 頁爬取失敗:{e}")
            continue

處理動態內容:Playwright

現代網站大量使用 JavaScript 渲染,requests 拿到的只是空殼。需要用無頭瀏覽器:

from playwright.sync_api import sync_playwright

def scrape_spa(url: str) -> list[dict]:
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        context = browser.new_context(
            user_agent="Mozilla/5.0 ...",
            viewport={"width": 1280, "height": 720}
        )
        page = context.new_page()
        
        # 攔截不必要的資源,加速載入
        page.route("**/*.{png,jpg,gif,svg,woff,woff2}", lambda route: route.abort())
        
        page.goto(url, wait_until="networkidle")
        
        # 等待特定元素出現
        page.wait_for_selector('.product-list', timeout=5000)
        
        # 滾動到底部載入更多
        page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
        page.wait_for_timeout(2000)
        
        items = page.query_selector_all('.product-item')
        result = []
        
        for item in items:
            title = item.query_selector('.title')
            price = item.query_selector('.price')
            result.append({
                'title': title.inner_text() if title else None,
                'price': price.inner_text() if price else None,
            })
        
        browser.close()
        return result

反爬蟲應對策略

1. 輪換 User-Agent

import random

USER_AGENTS = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Chrome/120.0.0.0 Safari/537.36',
    'Mozilla/5.0 (X11; Linux x86_64) Chrome/120.0.0.0 Safari/537.36',
]

def get_random_headers():
    return {
        'User-Agent': random.choice(USER_AGENTS),
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
    }

2. 隨機延遲

import random
import time

def random_sleep(min_sec=1.0, max_sec=3.0):
    delay = random.uniform(min_sec, max_sec)
    time.sleep(delay)

3. Session 保持

session = requests.Session()

# 先訪問首頁獲取 Cookie
session.get('https://example.com', headers=get_random_headers())

# 再訪問目標頁面
response = session.get(target_url, headers=get_random_headers())

非同步爬蟲(大幅提升效率)

import asyncio
import httpx
from asyncio import Semaphore

async def fetch(client: httpx.AsyncClient, url: str, sem: Semaphore) -> dict:
    async with sem:  # 限制並發數
        try:
            response = await client.get(url, timeout=10)
            response.raise_for_status()
            return {'url': url, 'content': response.text, 'status': 'ok'}
        except Exception as e:
            return {'url': url, 'error': str(e), 'status': 'failed'}

async def scrape_batch(urls: list[str], concurrency: int = 5):
    sem = Semaphore(concurrency)
    
    async with httpx.AsyncClient(
        headers=get_random_headers(),
        follow_redirects=True
    ) as client:
        tasks = [fetch(client, url, sem) for url in urls]
        results = await asyncio.gather(*tasks)
    
    return results

# 執行
urls = [f"https://example.com/page/{i}" for i in range(1, 51)]
results = asyncio.run(scrape_batch(urls, concurrency=5))

儲存爬取結果

import json
import csv
from pathlib import Path

def save_to_json(data: list, filepath: str):
    Path(filepath).parent.mkdir(parents=True, exist_ok=True)
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

def save_to_csv(data: list[dict], filepath: str):
    if not data:
        return
    with open(filepath, 'w', encoding='utf-8-sig', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)

爬蟲開發是一個持續和目標網站「鬥智」的過程,網站可能隨時更新結構或加強防護。建議保持程式碼的模組化,讓選擇器容易修改,並加入適當的錯誤處理和重試機制,讓爬蟲更加健壯。

分享這篇文章