Introduction

It’s been a month since my last post. During this month, I finished both AWS’s SAA and SOA certifications. I also added another 60 or so hours into my Factorio Py mod run. Overall, it has been a very chill summer in Auguest and September.

Back to the main topic, I was thinking of putting something into my new site. I want the idea to be simple and focus more on using AWS architectures. So I decided to implement a web scraper that collects a movie’s demostic revenue on daily basis from boxofficemojo.com.

Scraper Code

The coding for the scraper is very intuitive. To obtain the ranking of movies for a given day, I construct the URL using the date’s ISO format, send the request with the requests library, and parse the result using the BeautifulSoup library.

Below is the code for two functions for crawling the ranking of given date, and also a movie’s detail.

crawl_daily_ranking.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def crawl_daily_ranking(d: date) -> list[Movie]:
"""Crawl daily ranking
Args:
d: a date object.
Returns:
A list of parsed movie record.

"""
id_pattern = r"/release/(\w+)/\?ref_"

target_url = f"{BASE_URL}/date/{d.isoformat()}"

r = requests.get(target_url)
soup = BeautifulSoup(r.text, features="html.parser")

# navigate to the table containing the ranking.
table_html = soup.find(id="table")
# extracts the <tr> rows that contains the information, ignore the header row
records = table_html.find_all("tr")[1:]

movies: list[Movie] = []
for record in records:
rank: str = string_to_number(
record.find(class_="mojo-field-type-rank").text.strip()
)
title: str = record.find(class_="mojo-field-type-release").a.text.strip()
href: str = record.find(class_="mojo-field-type-release").a["href"]
href: str = re.findall(id_pattern, href)[0]

# handle all money
daily_gross: int = string_to_number(
record.find(class_="mojo-field-type-money").text.strip()
)

# handle the numbers
numbers = record.find_all(class_="mojo-field-type-positive_integer")
num_of_theaters: int = string_to_number(numbers[1].text) if numbers[1].text.isnumeric() else 0
num_of_days_in_theater: int = string_to_number(numbers[2].text)

# because num_of_days_in_theater starts with 1, substract 1
release_date = d - timedelta(days=num_of_days_in_theater - 1)

# special case: somtimes a row does not have a "studio" column, simply marked as "None"
distributor: str = record.find(class_="mojo-field-type-release_studios").a
distributor = distributor.text.strip() if distributor is not None else "None"

# print(f"rank = {rank}, title = {title}, daily_gross = {daily_gross}, href = {href} \t \
# num_of_theaters = {num_of_theaters}, num_of_days_in_theaters = {num_of_days_in_theaters},\t \
# studio = {studio}")

m = Movie(
id=href,
title=title,
release_date=release_date,
revenues={num_of_days_in_theater: DailyRecord(rank, daily_gross)},
num_of_theaters=num_of_theaters,
distributor=distributor,
)

movies.append(m)
return movies
crawl_movie_detail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def crawl_movie_detail(id: str) -> Movie:
target_url = f"{BASE_URL}/release/{id}"

r = requests.get(target_url)
soup = BeautifulSoup(r.text, features="html.parser")

# extract the title
title = soup.find("h1", class_="a-size-extra-large").text.strip()

# extract the summary
# interested in "Distributor", "Release Date", "Widest Release/(# of theaters)"
summaries = dict()

for div in soup.find(class_="mojo-summary-values"):
spans = div.find_all("span")
key = spans[0].text.strip()
value = spans[1].text.strip()
summaries[key] = value

distributor = summaries["Distributor"].replace("See full company information", "")
num_of_theaters = string_to_number(
re.findall(r"\d+", summaries["Widest Release"].replace(",", ""))[0]
)

# handle special conditions for "release_date"
release_date = None
for key in summaries:
if key.startswith("Release Date"):
release_date = datetime.strptime(
summaries[key][:12].strip(), "%b %d, %Y"
).date()

# navigate to the table containing the ranking.
table_html = soup.find(id="table")
# extracts the <tr> rows that contains the information, ignore the header row
records = table_html.find_all("tr")[1:]

revenues = dict()
for record in records:
rank: str = record.find(class_="mojo-field-type-rank").text.strip()

daily_gross: int = string_to_number(
record.find(class_="mojo-field-type-money").text.strip()
)

# handle the numbers
numbers = record.find_all(class_="mojo-field-type-positive_integer")
# num_of_theaters: int = string_to_number(numbers[0].text)
num_of_days_in_theater: int = string_to_number(numbers[1].text)

# print(f"rank = {rank}, daily_gross = {daily_gross}, \t\
# num_of_days_in_theater = {num_of_days_in_theater}")
revenues[num_of_days_in_theater] = daily_gross

return Movie(
id=id,
title=title,
release_date=release_date,
revenues=revenues,
num_of_theaters=num_of_theaters,
distributor=distributor,
)

S3

The ranking data is prefixed with ranking/ and the movie detail data is prefixed with movies/. So ranking/2023-09-16.json will represent the ranking on the date of September 16, 2023. And movies/rl1077904129.json will represent all details about the movie Barbie, (rl1077904129 is the id of Barbie in boxofficemojo).

To justify the reason of using S3 instead of DynaomoDB for data storage, I choose S3 here because most data is a WORM (Write Once, Read many) model. Ranking data and most movie data will not change after written. Only the newly released movies will have multiple changes when new daily ranking/revenue is updated. And this update typically happens when the movie is playing in theater. So it seems S3 is a more favorable and cheaper choice. The downside is the extra coding work that needed to merge existing movie records with new record.

AWS Lambda

Lambda Layer

I configure the runtime environment with two Lambda layers. One comprises the 3rd party libraries, such as Requests and BeautifulSoup. The other has my own source code for the crawler under /src folder.

To configure a layer for python environment, the offical docs enforces that all files must be put under /python. I found this Q&A link is more insightful when understand how to make a layer. The Makefile script below is simple and automates packaging each layer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
shared: clean
mkdir python
# move source to python/
rsync -amr --include="*.py" --include="*/" --exclude="*" ./src python/

# package into layer.zip
zip -r shared.zip python

pyenv: clean
mkdir python
# install requirements
python3 -m pip install --upgrade -r requirements.txt -t python/

# package into layer.zip
zip -r pyenv.zip python

clean:
rm -rf python
rm -f pyenv.zip
rm -f shared.zip

Lambda code

Both lambda functions below are trigger by SQS and configure with certain intervals to poll data from SQS.

  • movie-etl_crawl_ranking is more complex. It crawls ranking data from boxofficemojo.com and
    1. check if each movie has already existed in S3. If existed, it save the new record in S3 by merging with the existing one. Otherwise it will construct a new SQS request for crawling the movie’s detail.
    2. construct new requests for crawling older dates with delays, because I do not want to queue too many requests for crawling movie details
    3. Batch requests into groups of 10 due to SQS limitation and use SQS API to send requests.
etl_crawl_ranking.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import boto3
import botocore
import os
import json
import uuid
from datetime import date, timedelta
from src.crawler import crawl_daily_ranking
from src.encoders.json_encoder import encode_ranking, encode_movie, decode_movie


# some global variables
BUCKET_NAME = os.environ["bucket_name"]
RANKING_FOLDER = os.environ["ranking_folder_prefix"]
MOVIES_FOLDER = os.environ["movies_folder_prefix"]
SQS_RANKING_QUEUE_URL = os.environ["sqs_ranking_queue_url"]
SQS_MOVIES_QUEUE_URL = os.environ["sqs_movies_queue_url"]
FILE_EXTENSION = os.environ["file_extension"]
UUID_NS_BASE = uuid.UUID(os.environ["uuid_ns_base"])

# sqs batch request allow maximum of 10 requests at the same time.
SQS_BATCH_LIMIT = 10

# Debugging
DEBUG = False

s3_client = boto3.client("s3")
sqs_clent = boto3.client("sqs")

# this value stores the oldest/smallest date has been seen/parsed.
# so the next date sent to SQS will be (next_date_to_crawl - timedelta(days=1))
next_date_to_crawl = None


def lambda_handler(event, context):
global next_date_to_crawl, s3_client, sqs_clent
if next_date_to_crawl == None:
# a larger number here will always converge to the current date after comparison
next_date_to_crawl = date.fromisoformat("2024-01-01")

print(f"next_date_to_crawl = {next_date_to_crawl.isoformat()}")

# Messages other than sent by EventBridge cron job are recurrent messages,
# This will decide how many new messages will send to SQS.
recurrent_messages = 0

# merge all movie records by combining there revenues reocrd
movies_seen = dict()
message_deletion_request_entries = []

message_list = event["Records"]

# iterate through all crawl ranking requests
for record in message_list:
request_body = json.loads(record["body"])

if "daily_ranking" not in request_body:
# only message sent by EventBridge contains this field
recurrent_messages += 1

# parse paramter "date" to a date object
d = date.fromisoformat(request_body["date"])

# compare d with next_date_to_crawl
if d < next_date_to_crawl:
next_date_to_crawl = d

print(f"start crawling move ranking on {d.isoformat()}")

# use date as agrument to crawl the ranking
movies = crawl_daily_ranking(d)

my_print(f"Save rankings in S3 as {d.isoformat()}.{FILE_EXTENSION}")
# convert List[Movie] to writable bytes in {FILE_EXTENSION} format
writable = encode_ranking(movies)

# Save to S3
s3_client.put_object(
Bucket=BUCKET_NAME,
Key=f"{RANKING_FOLDER}/{d.isoformat()}.{FILE_EXTENSION}",
Body=writable,
ContentLanguage="en-US",
ContentLength=len(writable),
)
print(f"Complete upload {d.isoformat()}.{FILE_EXTENSION}")

# merge movies into movies_seen
for movie in movies:
if movie.id not in movies_seen:
movies_seen[movie.id] = movie
else:
movies_seen[movie.id].merge_records(movie.revenues)

# Use date as Id for deleting message in batch
message_deletion_request = {
"Id": str(uuid.uuid5(UUID_NS_BASE, record["messageId"])),
"ReceiptHandle": record["receiptHandle"],
}
message_deletion_request_entries.append(message_deletion_request)

sqs_clent.delete_message_batch(
QueueUrl=SQS_RANKING_QUEUE_URL, Entries=message_deletion_request_entries
)

# Now, for each movie in movies_seen, check if {movie.id}.json existence by
# examine the object header in S3 using list_objects_v2()
#
# If not exists (determined by S3.Client.exceptions.NoSuchKey exception),
# simply create a new SQS message to fetch the full record of the movie
#
# If exists, compare the metadata "newest_nth_day" with crawled movie's
# largest nth_day. If lesser, no further operation is needed. Otherwise,
# get the movie object, merge with new records, and put back to S3.
sqs_movies_requests = []

for movie in movies_seen.values():
try:
file_name = f"{MOVIES_FOLDER}/{movie.id}.{FILE_EXTENSION}"

# retrieve object's metadata
object_head_response = s3_client.head_object(
Bucket=BUCKET_NAME, Key=file_name
)

nth_day = int(object_head_response["Metadata"]["newest-nth-day"])
if nth_day >= movie.newest_nth_day():
# no need to update the movie
continue

# fetch file from S3
object_response = s3_client.get_object(Bucket=BUCKET_NAME, Key=file_name)

# deserialize json into a Movie object
movie_obj = decode_movie(object_response["Body"].read())

# combine with crawled movie
movie_obj.merge_records(movie.revenues)

# put movie_obj back to S3
writable = encode_movie(movie_obj)
s3_client.put_object(
Bucket=BUCKET_NAME,
Key=file_name,
Body=writable,
ContentLanguage="en-US",
ContentLength=len(writable),
)

except botocore.exceptions.ClientError:
# Movie {movie.title} does not exist in S3 yet,
# construct a SQS request for crawl the detail
my_print(
f"Movie {movie.title} does not exist in S3, prepare new crawler job"
)

# prepare SQS message, with 0 second delay
request = {
"Id": movie.id,
"MessageBody": json.dumps({"id": movie.id}),
}
sqs_movies_requests.append(request)

except Exception as err:
print(f"Unexpected {err=}, {type(err)=}")

send_sqs_requests(SQS_MOVIES_QUEUE_URL, sqs_movies_requests)

# handle recurrent requests for older day's ranking
sqs_ranking_requests = []
for i in range(recurrent_messages):
next_date = next_date_to_crawl - timedelta(days=i + 1)
request = {
"Id": next_date.isoformat(),
"MessageBody": json.dumps({"date": next_date.isoformat()}),
"MessageGroupId": "movie-etl",
# default delay is 15 minutes
}
sqs_ranking_requests.append(request)

print(
f"put in sqs request for rankings: {', '.join([entry['Id'] for entry in sqs_ranking_requests])}"
)
send_sqs_requests(SQS_RANKING_QUEUE_URL, sqs_ranking_requests)

# end of execution
return {"statusCode": 200}

movie-etl_crawl_movie.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
import json
import boto3

from src.crawler import crawl_movie_detail
from src.encoders.json_encoder import encode_movie

BUCKET_NAME = os.environ["bucket_name"]
MOVIES_FOLDER = os.environ["movies_folder_prefix"]
FILE_EXTENSION = os.environ["file_extension"]


def lambda_handler(event, context):
s3_client: boto3.S3.Client = boto3.client("s3")

records = event["Records"]
for record in records:
request_body = json.loads(record["body"])

# use for constructing target URL
movie_id = request_body["id"]

print(f"start crawling movie with id {movie_id}")

# use movie id to fetch detail from boxofficedojo.com
movie = crawl_movie_detail(movie_id)

# convert Movie object into writable bytes in {FILE_EXTENSION} format
writable = encode_movie(movie)

file_name = f"{MOVIES_FOLDER}/{movie_id}.{FILE_EXTENSION}"
s3_client.put_object(
Bucket=BUCKET_NAME,
Key=file_name,
Body=writable,
ContentLanguage="en-US",
ContentLength=len(writable),
Metadata={"newest-nth-day": str(movie.newest_nth_day())},
)

print(f"Complete crawling {movie_id}")

# end of execution
return {"statusCode": 200}

SQS and EventBridge

SQS

I use two queues, movie-etl_sqs_ranking and movie-etl_sqs_movies. For simplicity of the design, each queue has its own configurations.

At first, I use only single queue, and use Lambda’s Filter criteria in SQS trigger for routing requests. The detail documentation is here. But later I found out it is harder for debugging and I need to explicitly manage delays and visibility of requests in code.

EventBridge

A EventBridge scheduler is used here to schedule a cron job in daily basis to add a new request to movie-etl_sqs_ranking for crawling new ranking data.

Downfalls & TODOs

  1. Despite SQS’s visibility setting prevents different consumers from consuming the same message, the same lambda function can be trigger multiple times in parallel to consume the queue.

    This sometimes causes one function to process data such as “2023-09-17” and “2023-09-15”, and the other to process “2023-09-16”. In this case, one lambda will crawl the ranking data and send new request for “2023-09-14” and “2023-09-13”, while the other will send “2023-09-15” which is a duplicate request.

    A simple workaround is to increase the polling time of each lambda invocation. But this does not fundamentally solve the issue because two lambda functions can poll data at the same time.

    The other way, is to delegate the responsibility of sending new requests for crawling older ranking data with a different lambda function. This new lambda function will be trigger by another EventBridge scheduler, find out the next date to crawl, and send new requests. This will resolve the problem because it makes the next crawl date independent to the lambda input. But this approach will complicate the design.

  2. TODO: A frontend for showing the result using D3.js