It’s been a month since my last post. During this month, I finished both AWS’s SAA and SOA certifications. I also added another 60 or so hours into my Factorio Py mod run. Overall, it has been a very chill summer in Auguest and September.
Back to the main topic, I was thinking of putting something into my new site. I want the idea to be simple and focus more on using AWS architectures. So I decided to implement a web scraper that collects a movie’s demostic revenue on daily basis from boxofficemojo.com.
Scraper Code
The coding for the scraper is very intuitive. To obtain the ranking of movies for a given day, I construct the URL using the date’s ISO format, send the request with the requests library, and parse the result using the BeautifulSoup library.
Below is the code for two functions for crawling the ranking of given date, and also a movie’s detail.
defcrawl_daily_ranking(d: date) -> list[Movie]: """Crawl daily ranking Args: d: a date object. Returns: A list of parsed movie record. """ id_pattern = r"/release/(\w+)/\?ref_"
target_url = f"{BASE_URL}/date/{d.isoformat()}"
r = requests.get(target_url) soup = BeautifulSoup(r.text, features="html.parser")
# navigate to the table containing the ranking. table_html = soup.find(id="table") # extracts the <tr> rows that contains the information, ignore the header row records = table_html.find_all("tr")[1:]
movies: list[Movie] = [] for record in records: rank: str = string_to_number( record.find(class_="mojo-field-type-rank").text.strip() ) title: str = record.find(class_="mojo-field-type-release").a.text.strip() href: str = record.find(class_="mojo-field-type-release").a["href"] href: str = re.findall(id_pattern, href)[0]
# handle all money daily_gross: int = string_to_number( record.find(class_="mojo-field-type-money").text.strip() )
# handle the numbers numbers = record.find_all(class_="mojo-field-type-positive_integer") num_of_theaters: int = string_to_number(numbers[1].text) if numbers[1].text.isnumeric() else0 num_of_days_in_theater: int = string_to_number(numbers[2].text)
# because num_of_days_in_theater starts with 1, substract 1 release_date = d - timedelta(days=num_of_days_in_theater - 1)
# special case: somtimes a row does not have a "studio" column, simply marked as "None" distributor: str = record.find(class_="mojo-field-type-release_studios").a distributor = distributor.text.strip() if distributor isnotNoneelse"None"
r = requests.get(target_url) soup = BeautifulSoup(r.text, features="html.parser")
# extract the title title = soup.find("h1", class_="a-size-extra-large").text.strip()
# extract the summary # interested in "Distributor", "Release Date", "Widest Release/(# of theaters)" summaries = dict()
for div in soup.find(class_="mojo-summary-values"): spans = div.find_all("span") key = spans[0].text.strip() value = spans[1].text.strip() summaries[key] = value
distributor = summaries["Distributor"].replace("See full company information", "") num_of_theaters = string_to_number( re.findall(r"\d+", summaries["Widest Release"].replace(",", ""))[0] )
# handle special conditions for "release_date" release_date = None for key in summaries: if key.startswith("Release Date"): release_date = datetime.strptime( summaries[key][:12].strip(), "%b %d, %Y" ).date()
# navigate to the table containing the ranking. table_html = soup.find(id="table") # extracts the <tr> rows that contains the information, ignore the header row records = table_html.find_all("tr")[1:]
revenues = dict() for record in records: rank: str = record.find(class_="mojo-field-type-rank").text.strip()
daily_gross: int = string_to_number( record.find(class_="mojo-field-type-money").text.strip() )
# handle the numbers numbers = record.find_all(class_="mojo-field-type-positive_integer") # num_of_theaters: int = string_to_number(numbers[0].text) num_of_days_in_theater: int = string_to_number(numbers[1].text)
The ranking data is prefixed with ranking/ and the movie detail data is prefixed with movies/. So ranking/2023-09-16.json will represent the ranking on the date of September 16, 2023. And movies/rl1077904129.json will represent all details about the movie Barbie, (rl1077904129 is the id of Barbie in boxofficemojo).
To justify the reason of using S3 instead of DynaomoDB for data storage, I choose S3 here because most data is a WORM (Write Once, Read many) model. Ranking data and most movie data will not change after written. Only the newly released movies will have multiple changes when new daily ranking/revenue is updated. And this update typically happens when the movie is playing in theater. So it seems S3 is a more favorable and cheaper choice. The downside is the extra coding work that needed to merge existing movie records with new record.
AWS Lambda
Lambda Layer
I configure the runtime environment with two Lambda layers. One comprises the 3rd party libraries, such as Requests and BeautifulSoup. The other has my own source code for the crawler under /src folder.
To configure a layer for python environment, the offical docs enforces that all files must be put under /python. I found this Q&A link is more insightful when understand how to make a layer. The Makefile script below is simple and automates packaging each layer.
Both lambda functions below are trigger by SQS and configure with certain intervals to poll data from SQS.
movie-etl_crawl_ranking is more complex. It crawls ranking data from boxofficemojo.com and
check if each movie has already existed in S3. If existed, it save the new record in S3 by merging with the existing one. Otherwise it will construct a new SQS request for crawling the movie’s detail.
construct new requests for crawling older dates with delays, because I do not want to queue too many requests for crawling movie details
Batch requests into groups of 10 due to SQS limitation and use SQS API to send requests.
# this value stores the oldest/smallest date has been seen/parsed. # so the next date sent to SQS will be (next_date_to_crawl - timedelta(days=1)) next_date_to_crawl = None
deflambda_handler(event, context): global next_date_to_crawl, s3_client, sqs_clent if next_date_to_crawl == None: # a larger number here will always converge to the current date after comparison next_date_to_crawl = date.fromisoformat("2024-01-01")
# Messages other than sent by EventBridge cron job are recurrent messages, # This will decide how many new messages will send to SQS. recurrent_messages = 0
# merge all movie records by combining there revenues reocrd movies_seen = dict() message_deletion_request_entries = []
message_list = event["Records"]
# iterate through all crawl ranking requests for record in message_list: request_body = json.loads(record["body"])
if"daily_ranking"notin request_body: # only message sent by EventBridge contains this field recurrent_messages += 1
# parse paramter "date" to a date object d = date.fromisoformat(request_body["date"])
# compare d with next_date_to_crawl if d < next_date_to_crawl: next_date_to_crawl = d
print(f"start crawling move ranking on {d.isoformat()}")
# use date as agrument to crawl the ranking movies = crawl_daily_ranking(d)
my_print(f"Save rankings in S3 as {d.isoformat()}.{FILE_EXTENSION}") # convert List[Movie] to writable bytes in {FILE_EXTENSION} format writable = encode_ranking(movies)
# Save to S3 s3_client.put_object( Bucket=BUCKET_NAME, Key=f"{RANKING_FOLDER}/{d.isoformat()}.{FILE_EXTENSION}", Body=writable, ContentLanguage="en-US", ContentLength=len(writable), ) print(f"Complete upload {d.isoformat()}.{FILE_EXTENSION}")
# merge movies into movies_seen for movie in movies: if movie.idnotin movies_seen: movies_seen[movie.id] = movie else: movies_seen[movie.id].merge_records(movie.revenues)
# Use date as Id for deleting message in batch message_deletion_request = { "Id": str(uuid.uuid5(UUID_NS_BASE, record["messageId"])), "ReceiptHandle": record["receiptHandle"], } message_deletion_request_entries.append(message_deletion_request)
# Now, for each movie in movies_seen, check if {movie.id}.json existence by # examine the object header in S3 using list_objects_v2() # # If not exists (determined by S3.Client.exceptions.NoSuchKey exception), # simply create a new SQS message to fetch the full record of the movie # # If exists, compare the metadata "newest_nth_day" with crawled movie's # largest nth_day. If lesser, no further operation is needed. Otherwise, # get the movie object, merge with new records, and put back to S3. sqs_movies_requests = []
for movie in movies_seen.values(): try: file_name = f"{MOVIES_FOLDER}/{movie.id}.{FILE_EXTENSION}"
nth_day = int(object_head_response["Metadata"]["newest-nth-day"]) if nth_day >= movie.newest_nth_day(): # no need to update the movie continue
# fetch file from S3 object_response = s3_client.get_object(Bucket=BUCKET_NAME, Key=file_name)
# deserialize json into a Movie object movie_obj = decode_movie(object_response["Body"].read())
# combine with crawled movie movie_obj.merge_records(movie.revenues)
# put movie_obj back to S3 writable = encode_movie(movie_obj) s3_client.put_object( Bucket=BUCKET_NAME, Key=file_name, Body=writable, ContentLanguage="en-US", ContentLength=len(writable), )
except botocore.exceptions.ClientError: # Movie {movie.title} does not exist in S3 yet, # construct a SQS request for crawl the detail my_print( f"Movie {movie.title} does not exist in S3, prepare new crawler job" )
# prepare SQS message, with 0 second delay request = { "Id": movie.id, "MessageBody": json.dumps({"id": movie.id}), } sqs_movies_requests.append(request)
except Exception as err: print(f"Unexpected {err=}, {type(err)=}")
# handle recurrent requests for older day's ranking sqs_ranking_requests = [] for i inrange(recurrent_messages): next_date = next_date_to_crawl - timedelta(days=i + 1) request = { "Id": next_date.isoformat(), "MessageBody": json.dumps({"date": next_date.isoformat()}), "MessageGroupId": "movie-etl", # default delay is 15 minutes } sqs_ranking_requests.append(request)
print( f"put in sqs request for rankings: {', '.join([entry['Id'] for entry in sqs_ranking_requests])}" ) send_sqs_requests(SQS_RANKING_QUEUE_URL, sqs_ranking_requests)
I use two queues, movie-etl_sqs_ranking and movie-etl_sqs_movies. For simplicity of the design, each queue has its own configurations.
At first, I use only single queue, and use Lambda’s Filter criteria in SQS trigger for routing requests. The detail documentation is here. But later I found out it is harder for debugging and I need to explicitly manage delays and visibility of requests in code.
EventBridge
A EventBridge scheduler is used here to schedule a cron job in daily basis to add a new request to movie-etl_sqs_ranking for crawling new ranking data.
Downfalls & TODOs
Despite SQS’s visibility setting prevents different consumers from consuming the same message, the same lambda function can be trigger multiple times in parallel to consume the queue.
This sometimes causes one function to process data such as “2023-09-17” and “2023-09-15”, and the other to process “2023-09-16”. In this case, one lambda will crawl the ranking data and send new request for “2023-09-14” and “2023-09-13”, while the other will send “2023-09-15” which is a duplicate request.
A simple workaround is to increase the polling time of each lambda invocation. But this does not fundamentally solve the issue because two lambda functions can poll data at the same time.
The other way, is to delegate the responsibility of sending new requests for crawling older ranking data with a different lambda function. This new lambda function will be trigger by another EventBridge scheduler, find out the next date to crawl, and send new requests. This will resolve the problem because it makes the next crawl date independent to the lambda input. But this approach will complicate the design.
TODO: A frontend for showing the result using D3.js