Skip to main content

Python crawler practice - Douyin

Python crawler practice - Douyin

Table of contents

1. Analyze the tag structure of the home page’s work list

2. Before entering the work page, determine whether the work is a video work or a graphic work.

3. Enter the video works page and obtain the video

4. Enter the graphic works page to obtain pictures

5. Complete reference code

6. A way to obtain all works


This article mainly uses selenium.webdriver ( Firefox ), BeautifulSoup and other related libraries to conduct web crawling exercises in the centos system without logging in. For learning and communication purposes only.

Install and configure driver reference:

[1]: [Best practices for using Python+Selenium in Linux without graphical interface environment - Zhihu](https://zhuanlan.zhihu.com/p/653237868 "Best practices for using Python+Selenium in Linux without graphical interface environment - Zhihu")

[2]: How to solve the error 'chromedriver' executable needs to be in PATH - Zhihu

1. Analyze the tag structure of the home page’s work list

Python crawler practice - Douyin

     # webdriver initialization

driver = webdriver.Firefox(options=firefox_options)



# Set the page load timeout to 6 seconds

driver.set_page_load_timeout(6)



# Access the target blogger page

# e.g. https://www.douyin.com/user/MS4wLjABAAAAnq8nmb35fUqerHx54jlTx76AEkfq-sMD3cj7QdgsOiM

driver.get(target)



# Wait for the elements of class='e6wsjNLL' and class='niBfRBgX' to finish loading respectively before continuing execution

# (just wait for ul.e6wsjNLL to finish loading)

# WebDriverWait(driver, 6) sets the maximum wait time to 6 seconds.

WebDriverWait(driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'e6wsjNLL')))

WebDriverWait(driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'niBfRBgX'))))



# Execute the script in the browser, scroll to the bottom of the page, potentially showing more entries

driver.execute_script('document.querySelector(".wcHSRAj6").scrollIntoView()')

sleep(1)



# Use [beautifulsoup](/search?q=beautifulsoup) to parse the page source code

html = BeautifulSoup(driver.page_source, 'lxml')



# Close the driver

driver.quit()



# Get the list of works

ul = html.find(class_='e6wsjNLL')



# Get each work

lis = ul.findAll(class_='niBfRBgX')



2. Before entering the work page, determine whether the work is a video work or a graphic work.

Python crawler practice - Douyin

     element_a = li.find('a')

# If an element with class = 'TQTCdYql' can be found under the a tag.

# is_pictures = element_a.find(class = 'TQTCdYql'), then the work is a graphic, if not (then None), then the work is a video

is_pictures = element_a.find(class_='TQTCdYql')





if (not is_pictures) or (not is_pictures.svg).

# video artwork

pass

else.

# Pictures

pass



3. Enter the video works page and obtain the video

     # Spliced Works Address

href = f'https://www.douyin.com{element_a["href"]}'



# Use webdriver to access the artwork page

temp_driver = webdriver.Firefox(options=firefox_options)

temp_driver.set_page_load_timeout(6)

temp_driver.get(href)



# Wait for the element class='D8UdT9V8' to be displayed before executing (the content of this element is the release date of the work)

WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'D8UdT9V8'))))



html_v = BeautifulSoup(temp_driver.page_source, 'lxml')

temp_driver.quit()



# Get the publish time of the work

publish_time = html_v.find(class_='D8UdT9V8').string[5:]



video = html_v.find(class_='xg-video-container').video

source = video.find('source')



# Create a folder for the work (one folder for each work)

# Name the folder after the release date of the work plus the type of the work

path = create_dir(f'{publish_time}_video')



# Download the work

download_works(path, f'{get_current_time()}.mp4', f'https:{source["src"]}')



4. Enter the graphic works page to obtain pictures

Python crawler practice - Douyin

Python crawler practice - Douyin

     # Splice Works page address

href = f'https:{element_a["href"]}'



# Use webdriver to access the artwork page

temp_driver = webdriver.Firefox(options=firefox_options)

temp_driver.set_page_load_timeout(6)

temp_driver.get(href)



# Wait for the tab containing the time the work was published to finish loading

WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'YWeXsAGK')))



# Get the source code of the current page, close the webdriver and leave it to beautifulsoup

# (The rest of the task can be accomplished by continuing to use webdriver)

html_p = BeautifulSoup(temp_driver.page_source, 'lxml')

temp_driver.quit()



# Get the publish time of the entry

publish_time = f'{html_p.find(class_="YWeXsAGK")}'[-23:-7]



# List of images

img_ul = html_p.find(class_='KiGtXxLr')

imgs = img_ul.findAll('img')



# Create a folder for the work, in terms of when the work was published + type of work + number of images (if it's an image type work)

path = create_dir(f'{publish_time}_pictures_{len(imgs)}')



# Iterate over the images, get the url and then download them

for img in imgs.

download_works(path, f'{get_current_time()}.webp', f'{img["src"]}')



5. Complete reference code

     # -*- coding: utf-8 -*-



import threading,requests,os,zipfile

from selenium.webdriver.common.by import By

from selenium.common.exceptions import NoSuchElementException

from selenium.webdriver.support import expected_conditions as EC

from selenium.webdriver.support.wait import WebDriverWait

from datetime import datetime

from selenium import webdriver

from selenium.webdriver.firefox.options import Options

from pyvirtualdisplay import Display

from time import sleep

from bs4 import BeautifulSoup

from selenium.common.exceptions import WebDriverException



display = Display(visible=0, size=(1980, 1440))

display.start()



firefox_options = Options()

firefox_options.headless = True

firefox_options.binary_location = '/home/lighthouse/firefox/firefox'



# Get the current time

def get_current_time():

now = datetime.now()

format_time = now.strftime("_%Y-%m-%d__%H-%M-%S-%f__")

return format_time



# Set a root path where the work files and log files are kept

ABS_PATH = f'/home/resources/{get_current_time()}'



# Create a directory, dir_name is the release time of the work, in the format: 2024-02-26 16:59, which needs to be processed

def create_dir(dir_name).

dir_name = dir_name.replace(' ', '-').replace(':', '-')

path = f'{ABS_PATH}/{dir_name}'

try.

os.makedirs(path)

except FileExistsError: print(f'{ABS_PATH}/{dir_name}')

print(f'Attempt to create existing file, failed ({path})')

print(f'Attempt to create existing file, failed ({path})')

print(f'Attempt to create existing file failed ({path})') else: print(f'Creating directory succeeded {path}')

finally: return path

return path



# download Directory name, naming of the current file, address of the download

def download_works(dir_name, work_name, src):

response = requests.get(src, stream=True)

if response.status_code == 200: with open(f'{dir_name, work_name, src)

with open(f'{dir_name}/{work_name}', mode='wb') as f: for chunk in response.iterator: if response.status_code == 200.

for chunk in response.iter_content(1024): f.write(chunk): f.write(chunk)

f.write(chunk)



# Determine if a work has been downloaded

def test_work_exist(dir_name):: dir_name = dir_name.

dir_name = dir_name.replace(' ', '-').replace(':', '-')

path = f'{ABS_PATH}/{dir_name}'

if os.path.exists(path) and os.path.isdir(path): if os.listdir(path): if os.listdir(path)

if os.path.isdir(path): if os.listdir(path).

if os.listdir(path): if os.listdir(path): return True

if os.listdir(path): return True



def get_all_works(target): if os.path.exists(path) and os.path.

try: driver = webdriver.

driver = webdriver.Firefox(options=firefox_options)

driver.set_page_load_timeout(6)

# Target blogger page

driver.get(target)

WebDriverWait(driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'e6wsjNLL')))

WebDriverWait(driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'niBfRBgX')))



driver.execute_script('document.querySelector(".wcHSRAj6").scrollIntoView()')

sleep(1)


html = BeautifulSoup(driver.page_source, 'lxml')

driver.quit()

# List of works

ul = html.find(class_='e6wsjNLL')

# Each work

lis = ul.findAll(class_='niBfRBgX')



for li in lis.

element_a = li.find('a')

is_pictures = element_a.find(class_='TQTCdYql')



if (not is_pictures) or (not is_pictures.svg):: href = f'{eql')

href = f'https://www.douyin.com{element_a["href"]}'



temp_driver = webdriver.Firefox(options=firefox_options)

temp_driver.set_page_load_timeout(6)

temp_driver.get(href)



WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'D8UdT9V8')))



# Not required, webdriver can handle the rest of the content

html_v = BeautifulSoup(temp_driver.page_source, 'lxml')

temp_driver.quit()



# Get the publish time of the entry

publish_time = html_v.find(class_='D8UdT9V8').string[5:]



# if test_work_exist(f'{publish_time}_video').

# continue



video = html_v.find(class_='xg-video-container').video

source = video.find('source')



# Create a folder for this work

path = create_dir(f'{publish_time}_video')



# Download the work

download_works(path, f'{get_current_time()}.mp4', f'https:{source["src"]}')

else.

href = f'https:{element_a["href"]}')



temp_driver = webdriver.Firefox(options=firefox_options)

temp_driver.set_page_load_timeout(6)

temp_driver.get(href)

WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'YWeXsAGK')))



# Using beautifulsoup is not required

html_p = BeautifulSoup(temp_driver.page_source, 'lxml')

temp_driver.quit()



publish_time = f'{html_p.find(class_="YWeXsAGK")}'[-23:-7]



# List of images

img_ul = html_p.find(class_='KiGtXxLr')

imgs = img_ul.findAll('img')



# if test_work_exist(f'{publish_time}_pictures_{len(imgs)}'):: # continue

# continue



path = create_dir(f'{publish_time}_pictures_{len(imgs)}')

for img in imgs: # continue

download_works(path, f'{get_current_time()}.webp', f'{img["src"]}')



display.stop()

print('##### finish #####')

except WebDriverException as e.

print(f "WebDriverException caught: {e}")

except Exception as err: print("Caught WebDriverException: {e}")

print("Caught other error get_all_works end")

print(err)

finally: driver.quit()

driver.quit()

display.stop()



# Zip the directory

def zipdir(path, ziph): # ziph is a zipfile.

# ziph is a zipfile.

ZipFile object for root, dirs, files in os.walk(path): for file in files: # zipdir(path, ziph): # zipfile.

for file in files: # ziph is a zipfile.

ziph.write(os.path.join(root, file), os.path.relpath(path)): # ziph is a zipfile.

os.path.relpath(os.path.join(root, file), os.path.join(path, '...'))))



def dy_download_all(target_url).

get_all_works(target_url)



directory_to_zip = ABS_PATH # directory path

output_filename = f'{ABS_PATH}.zip' # output ZIP file name



with zipfile.ZipFile(output_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:

zipdir(directory_to_zip, zipf)



return f'{ABS_PATH}.zip' # return download address



if __name__ == '__main__'.

# Simple test

url = input('Please enter the url of the blogger's homepage:')

path = dy_download_all(url)

print('Download complete')

print(f'address: {path}')

Test Results:

Python crawler practice - Douyin

Python crawler practice - Douyin

6. A way to obtain all works

The above operations are performed without logging in. Even if the page is scrolled through the operation in webdriver, only a limited number of works can be obtained, about 20 items. Come up with a solution to this.

Visit the target blogger's page in logged-in status (or with cookies local storage, etc.), scroll to the bottom of the work, and then execute the JavaScript script in the console to obtain the information of all works (here, the work link and work type), and then write out to a text file.

Python crawler practice - Douyin

JavaScript code:

     let ul = document.querySelector('.e6wsjNLL');



// Store the results

works_list = [];



// Iterate through, adding one object at a time, including the address of the works page and whether the works are images or not

ul.childNodes.forEach((e) => {

let href = e.querySelector('a').href;

let is_pictures = e.querySelector('a').querySelector('.TQTCdYql') ? true : false;

works_list.push({href, is_pictures})

})



// Create a Blob object containing the content to be written to the file

var content = JSON.stringify(works_list);

var blob = new Blob([content], {type: "text/plain;charset=utf-8"});



// Create a link element

var link = document.createElement("a"); var link = document.createElement("a"); // Set the href attribute of the link.



// Set the href attribute of the link to the URL of the Blob object

link.href = URL.createObjectURL(blob); // Set the link's download attribute.



// Set the link's download property to specify the name of the file to download

link.download = "example.txt"; // Set the link's download property, specifying the name of the file to download.



// Trigger a click event on the link to start downloading the file

link.click();



Write the result:

Each element in the list is an object, href is the address of the work, and is_pictures uses a boolean value to indicate whether the work is a picture work.

Python crawler practice - Douyin

Then read the file in python, use json parsing, convert it into a dictionary list, traverse the list, and process each dictionary (that is, each work).

Sample code:

under win environment

     import json

import threading,requests,os

from bs4 import BeautifulSoup

from seleniumwire import webdriver

from selenium.webdriver.common.by import By

from selenium.common.exceptions import NoSuchElementException

from datetime import datetime

from selenium.webdriver.support import expected_conditions as EC

from selenium.webdriver.support.wait import WebDriverWait



# Get the current time

def get_current_time():

now = datetime.now()

format_time = now.strftime("_%Y-%m-%d__%H-%M-%S-%f__")

return format_time



# Set a root path where the work files and log files are kept

ABS_PATH = f'F:\\{get_current_time()}'



# Create a directory, dir_name is the release time of the work, format: 2024-02-26 16:59, need to be processed

def create_dir(dir_name).

dir_name = dir_name.replace(' ', '-').replace(':', '-')

path = f'{ABS_PATH}/{dir_name}'

try.

os.makedirs(path)

except FileExistsError: print(f'{ABS_PATH}/{dir_name}')

print(f'Attempt to create existing file, failed ({path})')

print(f'Attempt to create existing file, failed ({path})')

print(f'Attempt to create existing file failed ({path})') else: print(f'Creating directory succeeded {path}')

finally: return path

return path



# download Directory name, naming of the current file, address of the download

def download_works(dir_name, work_name, src):

response = requests.get(src, stream=True)

if response.status_code == 200: with open(f'{dir_name, work_name, src)

with open(f'{dir_name}/{work_name}', mode='wb') as f: for chunk in response.iterator: if response.status_code == 200.

for chunk in response.iter_content(1024): f.write(chunk): f.write(chunk)

f.write(chunk)



# Determine if the work has been downloaded

def test_work_exist(dir_name):

dir_name = dir_name.replace(' ', '-').replace(':', '-')

path = f'{ABS_PATH}/{dir_name}'

if os.path.exists(path) and os.path.isdir(path):

if os.listdir(path):

return True

return False



# Download a work

def thread_task(ul):

for item in ul.

href = item['href']

is_pictures = item['is_pictures']



if is_pictures: temp_driver = webdriver

temp_driver = webdriver.Chrome()

temp_driver.set_page_load_timeout(6)

temp_driver.get(href)

WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'YWeXsAGK')))



# Using beautifulsoup is not required

html_p = BeautifulSoup(temp_driver.page_source, 'lxml')

temp_driver.quit()



publish_time = f'{html_p.find(class_="YWeXsAGK")}'[-23:-7]



# List of images

img_ul = html_p.find(class_='KiGtXxLr')

imgs = img_ul.findAll('img')



# if test_work_exist(f'{publish_time}_pictures_{len(imgs)}'):: # continue

# continue



path = create_dir(f'{publish_time}_pictures_{len(imgs)}')

for img in imgs: # continue

download_works(path, f'{get_current_time()}.webp', f'{img["src"]}')

else.

temp_driver = webdriver.Chrome()

temp_driver.set_page_load_timeout(6)

temp_driver.get(href)



WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'D8UdT9V8')))



# Not required, webdriver can handle the rest of the content

html_v = BeautifulSoup(temp_driver.page_source, 'lxml')

temp_driver.quit()



# Get the publish time of the entry

publish_time = html_v.find(class_='D8UdT9V8').string[5:]



# if test_work_exist(f'{publish_time}_video').

# continue



video = html_v.find(class_='xg-video-container').video

source = video.find('source')



# Create a folder for this work

path = create_dir(f'{publish_time}_video')



# Download the work

download_works(path, f'{get_current_time()}.mp4', f'https:{source["src"]}')



if __name__ == '__main__'.

content = ''

# Read in the artwork link file externally

with open('... /abc.txt', mode='r', encoding='utf-8') as f.

content = json.load(f)



length = len(content)

if length <= 3 : thread_task(content)

thread_task(content)

else.

# split into three threads

ul = [content[0: int(length / 3) + 1], content[int(length / 3) + 1: int(length / 3) * 2 + 1], content[int(length / 3) * 2 + 1: length], content[int(length / 3) * 2 + 1: length], content[int(length / 3) * 2 + 1: length], content[int(length / 3) * 2 + 1: length].

content[int(length / 3) * 2 + 1: length]]

for child_ul in ul.

thread = threading.Thread(target=thread_task, args=(child_ul,))

thread.start()
*** Translated with www.DeepL.com/Translator (free version) ***