This article will show how to use Python to perform web scraping.
Web scraping is automatic to collect information from the World Wide Web through HTTP protocol or web browser. web scraping is a useful tool. If you’ve ever copied and pasted content from a website into an Excel spreadsheet, this is essentially what web scraping is, but on a very small scale.
Some benefits of using Python tools for web scraping:
Web Scraping Example: Www Website to extract the category, product, price, and image link.
mkdir data-extraction
cd data-extraction
setuptools==68.2.2 pylint==3.0.1 argparse==1.4.0 beautifulsoup4==4.12.2 requests==2.31.0 python-dotenv==1.0.0 boto3==1.28.62
#!/bin/sh command -v pylint >/dev/null 2>&1 || { echo >&2 "Running 'pylint' requires it to be installed."; exit 1; } echo "Running pylint..." find . -iname "*.py" -path "./src/*" | xargs pylint --rcfile .pylintrc
[MASTER] init-hook='import sys; sys.path.append("/data-extraction/src")' disable= C0114, # missing-module-docstring C0115, # missing-class-docstring C0116, # missing-function-docstring [FORMAT] max-line-length=80
0.1.0
import os import distutils.cmd import subprocess from setuptools.command.install import install from setuptools import setup, find_packages class PylintCommand(install): description = 'Check code convention' def run(self) -> None: install.run(self) path = get_current_path() os.system(f'sh {path}/run-pylint.sh') def get_current_path() -> str: return os.getcwd().replace(" ", "\ ").replace("(","\(").replace(")","\)") def read_file(file): with open(file) as f: return f.read() def read_requirements(file): with open(file) as f: return f.read().splitlines() version = read_file("VERSION") requirements = read_requirements("requirements.txt") setup( name='agapifa-data-extraction', version=version, description='Extract data to a file from html source', install_requires=requirements, classifiers=[ "Programming Language :: Python :: 3", ], packages=find_packages(include=['src']), python_requires = ">=3.10", cmdclass={ 'lint': PylintCommand, }, )
from argparse import ArgumentParser from time import strftime from file.csv_file import CsvFile from utils.url import is_valid_url from utils.aws_s3 import upload_to_s3 from core.extraction_data import ExtractionData from config.config import config from utils.file import remove def validate_args(args) -> None: if not args.urls: raise SystemExit('Please specify an URL as data source') if not args.out: raise SystemExit('Please specify an path to export a file') for url in args.urls: if is_valid_url(url) is False: raise SystemExit('Data source should be a URL format') def get_args(): parser = ArgumentParser(description='Data extraction args') parser.add_argument( '-i', '--urls', help='URLs as data source', type=str, nargs='+', required=True ) parser.add_argument('-o', '--out', help='Output path', required=True) parser.add_argument('-e', '--ext', default='csv', help='File extension') return parser.parse_args() def generate_file_name_include_ext(ext: str = 'csv'): return f'{strftime("%Y%m%d")}.{ext}' def main(): # Step 1: Args args = get_args() validate_args(args) # Step 2: Process out_file_path: str = f'{args.out}/{generate_file_name_include_ext()}' process = ExtractionData(args.urls[0], out_file_path) data: str = process.execute() # Step 3: Write file file = CsvFile( file_name = out_file_path, headers = config.CSV_HEADER.split(','), data = data ) file.create_file() # Step 4: Upload file to s3 upload_to_s3(out_file_path, args.out, generate_file_name_include_ext()) # Step 5: Done remove(out_file_path) SystemExit() if __name__ == "__main__": main()
import os from typing import get_type_hints, Union from dotenv import load_dotenv load_dotenv() class AppConfigError(Exception): pass def _parse_bool(val: Union[str, bool]) -> bool: return val if isinstance(val, bool) else val.lower() in [ 'true', 'yes', '1' ] class AppConfig: DEBUG: bool = False ENV: str = 'production' AWS_REGION: str AWS_SECRET_ACCESS_KEY: str AWS_ACCESS_KEY_ID: str CSV_HEADER: str VALID_DOMAINS: str def __init__(self, env): for field in self.__annotations__: if not field.isupper(): continue default_value = getattr(self, field, None) if default_value is None and env.get(field) is None: raise AppConfigError('The {field} field is required') try: var_type = get_type_hints(AppConfig)[field] if var_type == bool: value = _parse_bool(env.get(field, default_value)) else: value = var_type(env.get(field, default_value)) self.__setattr__(field, value) except ValueError as e: raise AppConfigError( 'Unable to cast value of \ "{env[field]}" to type \ "{var_type}" for "{field}" field' ) from e def __repr__(self): return str(self.__dict__) config = AppConfig(os.environ)
from urllib.parse import urlparse def is_valid_url(url: str) -> bool: parsed_url = urlparse(url) return bool(parsed_url.scheme and parsed_url.netloc) def get_domain(url: str) -> str: parsed_url = urlparse(url) return f'{parsed_url.scheme}://{parsed_url.netloc}'
import os def remove(file_path: str) -> None: os.unlink(file_path)
import os from boto3 import client as boto3Client from botocore import exceptions as botocoreExceptions from config.config import config def upload_to_s3(file_name: str, bucket: str, object_name=None) -> None: s3_client = boto3Client( 's3', aws_access_key_id=config.AWS_ACCESS_KEY_ID, aws_secret_access_key=config.AWS_SECRET_ACCESS_KEY ) try: if object_name is None: object_name = file_name s3_client.upload_file(file_name, bucket, object_name) except botocoreExceptions.ClientError: print('Upload file ({file_name}) to s3 is not success')
class FileBase: def __init__(self, file_name: str = '') -> None: self.file_name = file_name def validate_data(self) -> None: pass def create_file(self) -> None: pass
import typing import csv import os from file.file_base import FileBase class CsvFile(FileBase): def __init__(self, file_name: str = '', headers: typing.List[str] = None, data: typing.List[typing.List[str]] = None ) -> None: FileBase.__init__(self, file_name) self.headers = headers self.data = data def validate_data(self) -> None: is_valid = True is_valid = len(self.headers) == len(self.data) if is_valid is False: raise SystemExit('Header does not match with data') def create_file(self) -> None: path: str = os.path.dirname(self.file_name) isExist: bool = os.path.exists(path) if not isExist: os.makedirs(path) with open(self.file_name, 'w', encoding="UTF8", newline='', ) as csv_file: csv_writer = csv.writer(csv_file) csv_writer.writerow(self.headers) csv_writer.writerows(self.data) def read_file(self) -> dict: with open(self.file_name, 'r', encoding = 'UTF8') as csvfile: csv_reader = csv.reader(csvfile) self.headers = next(csv_reader) for row in csv_reader: self.data .append(row) result = {} result['header'] = self.headers result['rows'] = self.data return result
from typing import List from requests import get from bs4 import BeautifulSoup from utils.url import get_domain from config.config import config from core.www import Www class ExtractionData: def __init__(self, url: List[str] = None, file_name: str = '' ) -> None: self.url = url self.file_name = file_name self.data: List[List[str]] = [] def validate_domain(self, url: str) -> bool: if config.VALID_DOMAINS.split(',').count(get_domain(url)) > 0: return True return False def _get_html_content(self, url: str) -> str: content = get(url, timeout=500).content return BeautifulSoup(content, features="html.parser") def execute(self) -> str: domain_allow = self.validate_domain(self.url) if domain_allow is False: print(f'{self.url} does not support to get data') else: html_content = self._get_html_content(self.url) www = Www(html_content) self.data = www.execute() return self.data
from typing import List from requests import get from bs4 import BeautifulSoup class Www: def __init__(self, html_content: str = '') -> None: self.content = html_content self.result = [] def execute(self) -> List[List[str]]: self._get_urls() return self.result def _get_product_attribute(self, html: str) -> {}: product_detail_html = html.find_all('div', { 'class': 'chitietsanpham' }) image = f"https://www/{html.find('div', { 'class': 'hinhchitiet' }).find('a')['href']}" result = { 'name': html.find('h1', { "class": 'vcard fn' }).text, 'price': self._convert_price_to_number(product_detail_html[1].find('span').text), 'description': html.find('div', { 'class': 'noidung' }), 'image': image, } return result def _convert_price_to_number(self, price: str) -> float: return price.replace(' đ', '').replace('.', '') def _get_urls(self) -> List[str]: cats = self._get_cats() for index in range(len(cats)): cat_id = cats[index]['value'] if cat_id != '': sub_cats = self._get_sub_cats(cat_id) for sub_cat_idx in range(len(sub_cats)): sub_cat_id = sub_cats[sub_cat_idx]['value'] if sub_cat_id != '': products = self._get_pagination(f'https://www/index.php?com=tim-kiem&id_list={cat_id}&id_cat={sub_cat_id}') for product_idx in range(len(products)): self.result.append([ products[product_idx]['name'], products[product_idx]['price'], products[product_idx]['description'], products[product_idx]['image'], ]) def _get_cats(self) -> List[str]: return self.content.find(id="id_list").find_all('option') def _get_sub_cats(self, parent_cat_id: str): url: str = f'https://www/index.php?com=tim-kiem&id_list={parent_cat_id}' html_content = get(url, timeout=500).content html = BeautifulSoup(html_content, features="html.parser") return html.find(id="id_cat").find_all('option') def _get_pagination(self, url: str) -> List[dict]: html_content = get(url, timeout=500).content html = BeautifulSoup(html_content, features="html.parser") list_pagination = html.find("ul", { "class": "pagination" }) result = [] if list_pagination is not None: list_pagination = list_pagination.find_all('li') for pagination in range(len(list_pagination)): a_tag = list_pagination[pagination].find('a') current = list_pagination[pagination].find('a', { "class": "current" }) if a_tag is None: continue current_page_url = f'{url}&page={a_tag.string}' if current is None: current_page_url = a_tag['href'] html_content = get(current_page_url, timeout=500).content html = BeautifulSoup(html_content, features="html.parser") all_product = html.find_all('div', { 'class': "ten-product" }) for index in range(len(all_product)): product_detail_url = all_product[index].find('h3').find('a')['href'] product_detail_html_content = get(f'https://www/{product_detail_url}', timeout=500).content product_detail_html = BeautifulSoup(product_detail_html_content, features="html.parser") result.append(self._get_product_attribute(product_detail_html)) return result
DEBUG=True # AWS AWS_REGION= AWS_SECRET_ACCESS_KEY= AWS_ACCESS_KEY_ID= # Core VALID_DOMAINS=https://www CSV_HEADER=name,price,description,image
python3 setup.py install
python3 setup.py lint
python3 src -i <URLs source> -o <s3 path>
In this post, we’ve looked at what data scraping is, how it’s used, and what the process involves.
Good luck to you, hope this post is of value to you!!!!