Read Common Crawl Parquet Metadata with Python
Common Crawl releases columnar indexes of their web crawls in the Apache Parquet file format. This can be efficiently queried in a distributed manner using Amazon Athena or Spark, and the Common Crawl team have released a number of examples. I wanted to see if I could use Python to directly read these Parquet files to find crawls of particular websites. The default way to do this is using their capture index, but this is often under high load so can fail and will reject too many simultaneous requests. If we can quickly find the data we need from these static Parquet files we could run queries processes in parallel.
The columnar indexes for Common Crawl are too large to quickly download. For example the 2022-05 crawl has a columnar index consisting of 300 parquet files with a total compressed size of 205 GB. Keep in mind this is just an index, it doesn’t contain any crawled web page data; the actual data is over 72TB spread across 72,000 files. It would take a lot of time to download all these files to look for crawls across a handful of domains, but it turns out we don’t need to. The Parquet files are approximately sorted by url_surtkey
, which is a canonical form of URL with host name reversed. For example the url_surtkey
for this page would be com,skeptric)/reading-parquet-metadata
. By searching the Parquet files that contain the url_surtkey
we are looking for we can just query one or two of the 300 files.
In this article we download the Parquet metadata for all 25,000 WARC indexes in under 7 minutes from Australia, using asyncio. A way to make it even faster would be to run it in an EC2 instance where the data is, us-east-1
. We can then use it to query a single domain for a single crawl in about a minute. This is fairly slow but we could make many requests concurrently across crawls and domains. You can view the corresponding Jupyter notebook (or the raw ipynb).
Reading Metadata with PyArrow
PyArrow, part of the Apache Arrow project, provides great support for Parquet through the dataset interface. We can pass it the base path and it will discover all the partitions and Parquet files in seconds. Note that Common Crawl requires authentication for S3 access, so this requires having an AWS account setup and configured (see the boto3 documentation for how to do that).
import pyarrow.dataset as ds
= 's3://commoncrawl/cc-index/table/cc-main/warc/'
cc_index_s3_path = ds.dataset(cc_index_s3_path, format='parquet', partitioning='hive') cc_index
The individual files can be accessed from the dataset using the get_fragments
method, and we can pass a filter on the partition keys to efficiently only access certain files. Here we get the index to the WARC files for the 2022-05 crawl:
= list(
fragments
cc_index.get_fragments(filter=(ds.field('crawl') == 'CC-MAIN-2022-05') &
'subset') == 'warc')))
(ds.field(len(fragments)
# 300
PyArrow exposes methods for accessing metadata for these fragments (files), such as the number of rows and their sizes. Each fragment contains “row groups” which are a large block of records. In this case there are associated statistics stored in the row group metadata:
0].row_groups[0].statistics fragments[
Which gives a long list of data starting with:
{'url_surtkey': {'min': 'com,wordpress,freefall852)/2016/03/29/billy-guy',
'max': 'com,worldpackers)/search/skill_hospitality_entertainment/type_hotel?location_categories[]=nature&location_types[]=hotel&min_meals_count[]=3&months[]=11&skills[]=music'},
'url': {'min': 'http://03.worldchefsbible.com/',
'max': 'https://zh.worldallianceofdramatherapy.com/he-mission'},
'url_host_name': {'min': '03.worldchefsbible.com',
'max': 'zr1.worldblast.com'},
'url_host_tld': {'min': 'com', 'max': 'com'},
'url_host_2nd_last_part': {'min': 'wordpress', 'max': 'worldpackers'},
'url_host_3rd_last_part': {'min': '03', 'max': 'zr1'},
'url_host_4th_last_part': {'min': 'bbbfoundation', 'max': 'www'},
'url_host_5th_last_part': {'min': 'http', 'max': 'toolbox'},
'url_host_registry_suffix': {'min': 'com', 'max': 'com'},
'url_host_registered_domain': {'min': 'wordpress.com',
'max': 'worldpackers.com'},
'url_host_private_suffix': {'min': 'com', 'max': 'com'},
'url_host_private_domain': {'min': 'wordpress.com',
'max': 'worldpackers.com'},
'url_host_name_reversed': {'min': 'com.wordpress.freefall852',
'max': 'com.worldpackers.www'},
'url_protocol': {'min': 'http', 'max': 'https'},
'url_port': {'min': 443, 'max': 2000},
...
One thing to notice is the url_surtkey
goes through a pretty narrow alphabetical range from com,wordpress
through to com,worldpackers
(as does url_host_private_domain
but it’s not available for older crawls). If we wanted to find crawl data from a particular domain it’s easy to tell whether it’s from this domain. The fragments have a method to_row_groups
which has a filter
argument which can use these statistics to skip over blocks. Unfortunately it’s quite slow for Pyarrow to just read the Parquet metadata; from my laptop it can process about 2-3 files per second.
Using fastparquet
Fastparquet, part of the Dask Project, also provides a good interface for reading Parquet files. However it is quite slow; it takes 2.3 seconds to access a single file, which just reads the metadata.
from fastparquet import ParquetFile
import s3fs
= s3fs.S3FileSystem()
fs = ParquetFile(fn=fragments[0].path, fs=fs) pf
All the file metadata is accessible for each of the columns through the fmd
attribute.
0].columns[0].meta_data._asdict() pf.fmd.row_groups[
{'type': 6,
'encodings': [0, 4],
'path_in_schema': ['url_surtkey'],
'codec': 2,
'num_values': 1730100,
'total_uncompressed_size': 117917394,
'total_compressed_size': 23113472,
'key_value_metadata': None,
'data_page_offset': 4,
'index_page_offset': None,
'dictionary_page_offset': None,
'statistics': {'max': None,
'min': None,
'null_count': 0,
'distinct_count': None,
'max_value': "b'com,worldpackers)/search/skill_hospitality_entertainment/type_hotel?location_categories[]=nature&location_types[]=hotel&min_meals_count[]=3&months[]=11&skills[]=music'",
'min_value': "b'com,wordpress,freefall852)/2016/03/29/billy-guy'"},
'encoding_stats': [{'page_type': 0, 'encoding': 0, 'count': 122}],
'bloom_filter_offset': None}
Reading the data manually
To understand what’s happening here we can access the data manually. The Parquet file layout contains all the data, followed by the File Metadata, followed by a 32-bit integer describing the length of the metadata in bytes, and then the magic string “PAR1” to identify the file type.
To read the data first we need to find the length of the file so that we can find the end of it. We can access the HTTP REST endpoint for AWS S3 using boto3 to make a HEAD request to find the Content-Length without reading any data.
import boto3
= boto3.client('s3')
s3
file = fragments[0].path
= file.split('/', 1)
bucket, key = s3.head_object(Bucket=bucket, Key=key)
metadata = int(metadata['ContentLength'])
content_length f'{content_length / (1024**3):0.1f} GB'
# 1.3 GB
We then need to find the length of the file metadata by reading the last 8 bytes, using a HTTP Range Request.
= content_length
end_byte = end_byte - 8
start_byte
= s3.get_object(Bucket=bucket,
response =key,
Key=f'bytes={start_byte}-{end_byte}')
Range= response['Body'].read()
end_content
assert end_content[-4:] == b'PAR1'
= int.from_bytes(end_content[:4], byteorder='little')
file_meta_length f'{file_meta_length / 1024:0.1f} kb'
# 57.4 kb
Now that we know how long the file metadata is we can work backwards to extract it.
= content_length - 8
end_byte = content_length - 8 - file_meta_length
start_byte = s3.get_object(Bucket=bucket,
response =key,
Key=f'bytes={start_byte}-{end_byte}')
Range= response['Body'].read() file_meta_content
The metadata format is pretty complicated, so instead of parsing it we can use fastparquet to extract a representation using Apache Thrift.
from fastparquet.cencoding import from_buffer
= from_buffer(file_meta_content, "FileMetaData")
fmd type(fmd)
# fastparquet.cencoding.ThriftObject
The attributes of this object correspond to the metadata, and can be accessed as directly with fastparquet. The whole process takes under a second per file; faster than both PyArrow and fastparquet, but still a little slow.
Concurrent request with asyncio
We want to read the metadata from many parquet files but we spend a lot of time waiting to get data from the server. This kind of I/O bound problem is perfect for asyncio. We could use aiobotocore to access S3 in an asynchronous context, but instead I decided to use the Common Crawl HTTP Interface. This is publicly accessible and does not require any authentication, but unfortunately the Parquet files can’t be automatically discovered over HTTP. We can convert our files discovered by Apache Arrow over S3 into HTTP frontend by replacing the Common Crawl bucket commoncrawl/
with the prefix https://data.commoncrawl.org/
.
# Just get the warc files
= [f for f in cc_index.files if '/subset=warc/' in f]
warc_files # Convert from S3 bucket/key to data.commoncrawl.org/
= ['https://data.commoncrawl.org/' +
http_files '/', 1)[1] for x in warc_files] x.split(
I don’t understand asyncio in as much detail as I would like, so this code may not be quite right. I modified a stackoverflow answer on using aiohttp
to download files to this usecase.
import asyncio
import aiohttp
async def _async_parquet_metadata_http(url, session):
"""Retrieve Parquet file metadata from url using session"""
async with session.head(url) as response:
await response.read()
= response.headers
output_headers = int(output_headers['Content-Length'])
content_length
={"Range": f'bytes={content_length-8}-{content_length}'}
headersasync with session.get(url=url, headers=headers) as response:
= await response.read()
end_content
if end_content[-4:] != b'PAR1':
= 'File at %s does not look like a Parquet file; magic %s'
error raise ValueError(error % (path, end_content[-4:]))
= int.from_bytes(end_content[:4], byteorder='little')
file_meta_length
= content_length-8-file_meta_length
start = content_length-8
end ={"Range": f'bytes={start}-{end}'}
headersasync with session.get(url, headers=headers) as response:
= await response.read()
file_meta_content return file_meta_content
async def fetch_parquet_metadata_http(urls):
"""Retrieve Parquet metadata for urls asyncronously, returning exceptions"""
async with aiohttp.ClientSession(raise_for_status=True) as session:
= await asyncio.gather(*[_async_parquet_metadata_http(url, session)
ret for url in urls],
=True)
return_exceptionsreturn ret
To scan all the indexed WARC crawls is over 25,000 files and the Parquet metadata alone is over 1 GB. When dealing with this many requests, 3 per file, at least one request is likely to fail. There are three kinds of failures I can think of:
- An intermittent issue causes an individual load to fail.
- There is an issue with an individual file and so loading will always fail.
- There is an environmental issue (server rejecting all requests, network down) and all requests fail.
Note about we passed return_exceptions=True
in asyncio.gather
which allows us to handle these errors. A simple approach is:
- Run a batch of N requests
- Capture any errors and put these jobs in a retry cue (to handle 1)
- Remove any jobs that have been retried too many times (to handle 2)
- Persist successful results so they can be reused
- If more than x% of the N requests fail abort the process (to handle 3).
Running in small batches also avoids the problem of running too many asynchronous tasks in parallel and locking the CPU. We can store our metadata in an sqlitedict and maintain a job queue and retry counts.
from collections import defaultdict
from sqlitedict import SqliteDict
= SqliteDict('common_crawl_columnar_index_metadata.sqlite')
metadata_store
= 3
max_retries = 5
max_exceptions_per_batch = 1000
batch_size
= defaultdict(int)
retries = defaultdict(list)
exceptions = set(metadata_store.keys())
seen
= [x for x in http_files if x not in seen]
jobs len(jobs)
# 25193
We can now run through all the jobs:
= time.time()
start_time
while len(jobs) > 0:
= jobs[:batch_size]
batch
= await(async_parquet_metadata_http(batch))
batch_metadata
= 0
num_exceptions
for job, metadata in zip(batch, batch_metadata):
if isinstance(metadata, Exception):
f'{job}: {type(Exception)}')
logging.warning(+= 1
num_exceptions
exceptions[job].append(metadata)+= 1
retries[job] if retries[job] >= max_retries:
jobs.remove(job)else:
assert isinstance(metadata, bytes)
= metadata
metadata_store[job]
jobs.remove(job)
metadata_store.commit()if num_exceptions > max_exceptions_per_batch:
print('Too many exceptions %i' % num_exceptions)
break
print(f'Finished in {time.time() - start_time:0.0f}s')
# Finished in 408s
This is processing over 500 Parquet files per second, far better than our starting point of 2-3 Parquet files per second. We can then extract the relevant metadata into a Pandas Dataframe.
import pandas as pd
= ['url_surtkey']
stats_columns
def extract_row_group_metadata(k, v):
= from_buffer(v, "FileMetaData")
fmd
for idx, row_group in enumerate(fmd.row_groups):
= {
result 'path': k[len('https://data.commoncrawl.org/'):],
'crawl': k.split('/')[-3].split('=')[-1],
'subset': k.split('/')[-2].split('=')[-1],
'row_group': idx,
'num_rows': row_group.num_rows,
'byte_size': row_group.total_byte_size,
}for col in stats_columns:
= get_column_metadata(col, row_group).statistics.min_value
minimum = get_column_metadata(col, row_group).statistics.max_value
maximum # Convert byte strings into unicode
if isinstance(minimum, bytes):
= minimum.decode('utf-8')
minimum if isinstance(maximum, bytes):
= maximum.decode('utf-8')
maximum
f'min_{col}'] = minimum
result[f'max_{col}'] = maximum
result[yield result
def extract_metadata(metadata_store):
for k, v in metadata_store.items():
for row in extract_row_group_metadata(k, v):
yield row
= pd.DataFrame(extract_metadata(metadata_store))
df_metadata df_metadata.head()
Using the metadata
Now that we have the metadata we can use it to look for crawls of commoncrawl.org
in the 2020-24 crawl. First we query the metadata to find which files and row groups may contain this domain.
= df_metadata.query('crawl == "CC-MAIN-2020-24" &\
results min_url_surtkey <= "org,commoncrawl)/" <= max_url_surtkey')
To iterate over these efficiently we can make a mapping from the paths to a list of row groups.
= (
path_to_row_groups
results'path')
.groupby(= ('row_group', list))
.agg(row_groups 'row_groups']
[
.to_dict() )
We can then use Pyarrow to read the required files and row groups:
from fsspec.implementations.http import HTTPFileSystem
= HTTPFileSystem()
http = ds.dataset('https://data.commoncrawl.org/' + path,
search_ds =http,
filesystemformat='parquet',
='hive')
partitioning
= ['url', 'url_host_name',
columns 'warc_filename', 'warc_record_offset',
'warc_record_length']
for fragment in search_ds.get_fragments():
= fragment.path[len(http_prefix):]
path = fragment.split_by_row_group()
row_groups for row_group_idx in path_to_row_groups[path]:
= row_groups[row_group_idx]
row_group = row_group.to_table(
data =columns,
columnsfilter=ds.field('url_host_name') == 'commoncrawl.org')
if len(data) > 0:
all_groups.append(data)
= pa.concat_tables(all_groups).to_pandas() results
This takes around 1 minute to scan through the 5 row groups; it could potentially be sped up using concurrent requests. We can then use this to download a single WARC files
import zlib
= results.iloc[0]
a = 'https://data.commoncrawl.org/' + a.warc_filename
url
= a.warc_record_offset
start = a.warc_record_offset + a.warc_record_length
end = {"Range": f'bytes={start}-{end}'}
header
= requests.get(url, headers=header)
r = r.content
warc_data = zlib.decompress(warc_data, wbits = zlib.MAX_WBITS | 16)
data print(data.decode('utf-8')[:2000])
Missing Data
Looking through the crawl data between 2017-43 and 2018-43 there are not any statistics for url_surtkey
:
(
df_metadata'crawl')
.groupby(= ('path', 'count'),
.agg(row_groups = ('path', 'nunique'),
files = ('min_url_surtkey', 'count'),
min_url = ('max_url_surtkey', 'count'),
max_url
)'min_url != row_groups')
.query(
.sort_index() )
One way to construct this manually by reading through all the Parquet files and calculating the minimum and maximum url_surtkey
by row group. If we’re happy to only filter at a file level we could also use AWS Athena to find them:
SELECT "$path" as path,
min(url_surtkey) AS min_url_surtkey,
max(url_surtkey) AS max_url_surtkey
FROM ccindex
WHERE crawl BETWEEN 'CC-MAIN-2017-43' AND 'CC-MAIN-2018-43'
AND subset = 'warc'