Implementing Lempel-Ziv 77 Paper in Python

entropy
Published

September 22, 2024

Lossless compression is the process of taking some data and squishing it into a smaller space without losing any information. It is ubiquitous in data storage and transfer, for example this web page was almost certainly compressed as it was sent from my server to your browser. It is somewhat surprising there are general algorithms that effectively compress most real files whether they store text, audio, images or binary files. However there is one kind of redundancy that’s very common in all these data types: repeated patterns. In 1977 Lempel Ziv came up with a universal compression algorithm to exploit this redundancy (now called LZ77), and it is still used everywhere almost 50 years later in HTTP compression, Zip files (including gzip), and most document formats.

The key idea in LZ77 compression is it often requries less data to store a reference to a string of symbols than to repeat the symbols. For example in the previous sentence I used the 16 characters compression is that I also used 777 characters earlier in the first sentence. Instead of using 16 bytes to encode the characters directly I could use 2 bytes to record that the character occured 777 symbols earlier (in hexadecimal notation 03 09) and 1 byte to record the length of 16 (in hexadecimal 10) for a total of 3 bytes saving 13 bytes. The neat thing about this idea is by having a sliding window of recent characters (in this case 65 thousand) the amount of memory required is bounded and it can be compressed in a single pass. A great demonstration of this is Colin Morris’ using Lempel-Ziv to compress song lyrics.

This article will implement Lempel and Ziv’s original paper A Universal Algorithm for Sequential Data Compression in Python. The algorithm as described in the paper is not very efficient, most “LZ77” algorithms actually use variations of this like in the DEFLATE RFC, but this shows the essential ideas. We will also go through their analysis on bounds and efficiency in the paper and show they are not that interesting, but in practice it is effective.

The Compression Algorithm

The original paper was hard for me to read because they jump into dense mathematical notation without explaining the intuition behind it. The key idea is when you’ve got some string you want to encode like 010210 and you’ve got a buffer of strings you’ve seen before like 000001, you want to look back into the buffer and find the longest substring in the buffer that matches the start of the string you want to encode (in this case the one starting with 01). An important trick is that you can extend from the buffer into the string you are encoding, which is equivalent to cyclically extending the substring. In this example we can extend 1 more character to get 010.

Some terminology

The symbols come from some finite alphabet \(A\), with some number \(\alpha\) symbols, which we label \(A = \{ 0,1, \ldots, \alpha-1 \}\). So for a ternary string it looks like:

α = 3
A = list(map(str, range(α)))
A
['0', '1', '2']

A string, or word, S, is then just symbols from A concatenated together; from our previous example:

S = '000001' '0010210'
S
'0000010010210'

In the paper they index the strings starting at \(1\), whereas Python uses \(0\) based indexing.

They denote the substring starting at index \(i\) and ending at index \(j\) by \(S(i,j)\), which we would write at S[i-1:j-1]. We can recover our buffer:

i = 1
j = 7
S[i-1:j-1] # S(i,j)
'000001'

They then define the reproducible extension of \(S\) at \(j\), which is the longest substring starting after index \(j\) that we can get by reading from some point in the buffer.

So for example suppose we have the string 00101011 and \(j=3\) so the string to match is 01011:

  • starting at the first index \(i=1\) then the longest match would be 0 with length \(L=1\) (since if we go one character further 00 does not match 01).
  • starting at next index \(i=2\) then the longest match would be 0101 with length \(L=4\).
  • starting at the final index \(i=3\) the longest match is the empty string, with length \(L=0\), since it starts with a 1 and the string to match starts with a 0.

So the reproducible extension is 0101.

Let’s write this as a function; we count how many symbols from the string starting from \(i\) match the string starting after \(j\), stopping when there is a mismatch:

def L(S: str, j: int, i: int) -> int:
    assert i<=j

    offset = 0
    while (j+offset) < len(S) and (S[i-1+offset] == S[j+offset]):
        offset += 1
    
    return offset
L('001' '01011', j=3, i=1)
1
L('001' '01011', j=3, i=2)
4
L('001' '01011', j=3, i=3)
0

The index of a longest matching substring is called the pointer \(p\) of the reproducible extension, so in this example \(p=2\). There could be multiple pointers, in practice the paper tends to use the highest indexed pointer.

We can find the pointer as above by iterating over every possible index and finding the one with the largest \(L\):

def p(S: str, j:int) -> int:
    max_length = 0
    max_index = 0

    # Using 1-based indexing
    for i in range(1, j+1):
        length = L(S, j, i)
        if length >= max_length: 
            max_length = length
            max_index = i

    return max_index

And as before we find the pointer starts at 2:

p('00101011', j=3)
2

Writing in more familiar notation

I find using the mismatch between the indexing in the paper and in Python confusing, so I will switch it to match Python conventions:

  • i and j are 0-based so i=0 corresponds to looking from the start of the string
  • j is such that the prefix is S[:j] and the string to match is S[j:]
  • Consequently it must be that i < j
def longest_prefix_length(S: str, j: int, i: int) -> int:
    """Length of longest matching substrings of S from i and j."""
    assert 0<=i<j
    assert j <= len(S)

    offset = 0
    while (j+offset < len(S)) and (S[i+offset] == S[j+offset]):
        offset += 1
    
    return offset

With respect to our previous notation this means j is the same but i is reduced by 1:

assert longest_prefix_length('001' '01011', j=3, i=3-1) == L('00101011', j=3, i=3)
assert longest_prefix_length('001' '01011', j=3, i=2-1) == L('00101011', j=3, i=2)
assert longest_prefix_length('001' '01011', j=3, i=1-1) == L('00101011', j=3, i=1)

Let’s check some simple cases (I found some bugs in my original implementation by checking boundary cases):

assert longest_prefix_length('a' '', j=1, i=0) == 0

assert longest_prefix_length('a' 'a', j=1, i=0) == 1
assert longest_prefix_length('a' 'b', j=1, i=0) == 0

assert longest_prefix_length('a' 'aa', j=1, i=0) == 2
assert longest_prefix_length('a' 'ab', j=1, i=0) == 1
assert longest_prefix_length('a' 'ba', j=1, i=0) == 0
assert longest_prefix_length('a' 'bb', j=1, i=0) == 0

assert longest_prefix_length('ab' 'a', j=2, i=0) == 1
assert longest_prefix_length('ab' 'a', j=2, i=1) == 0
assert longest_prefix_length('ba' 'a', j=2, i=0) == 0
assert longest_prefix_length('ba' 'a', j=2, i=1) == 1

We can use property based testing to check the result actually is a prefix:

from hypothesis import given, strategies as st

@given(st.text(min_size=1),
       st.text(),
       st.integers())
def test_longest_prefix_is_prefix(context, string, i):
    # i in [0, len(context)-1]
    i = i % len(context)
    
    S = context + string
    j = len(context)
    
    l = longest_prefix_length(S, j, i)
    assert S[j:j+l] == S[i:i+l]

test_longest_prefix_is_prefix()

And to check it’s the longest prefix; that if we go one character further we don’t get a prefix:

from hypothesis import given, strategies as st

@given(st.text(min_size=1),
       st.text(),
       st.integers())
def test_longest_prefix_is_longest(context, string, i):
    # i in [0, len(context)-1]
    i = i % len(context)
    
    S = context + string
    j = len(context)
    
    l = longest_prefix_length(S, j, i)
    assert S[j:j+l+1] != S[i:i+l+1]

test_longest_prefix_is_longest()

We know rewrite the p function to get the pointer and its length, since we will need both, with this new notation:

from typing import Tuple

def pointer(S: str, j: int) -> Tuple[int, int]:
    """Index and length of pointer of S at j.

    Returns the index and length of the longest matching prior
    substring that S[:j] starts with.
    Can extend inato the string.
    """
    max_index, max_length = 0, 0

    for i in range(j):
        length = longest_prefix_length(S, j, i)
        if length >= max_length: 
            max_index, max_length = i, length

    return (max_index, max_length)

We get the same result as before

assert pointer('00101011', j=3) == (1,4)

We can use property based testing to check that it actually does give an extension:

@given(st.text(min_size=1),
       st.integers())
def test_pointer_gives_extension(S, j):
    # j in [0, len(S)]
    j = j % (len(S) + 1)
    
    p, L = pointer(S, j)

    assert S[p:p+L] == S[j:j+L]

test_pointer_gives_extension()

And further check it actually gives a maximal extension:

@given(st.text(min_size=1),
       st.text(),
       st.integers())
def test_pointer_gives_maximal_extension(context, string, i):
    # i in [0, len(context)-1]
    i = i % len(context)
    
    S = context + string
    j = len(context)
    
    p, L = pointer(S, j)

    assert L >= longest_prefix_length(S, j, i)

test_pointer_gives_maximal_extension()

Encoding symbols

Now that we can find the longest matching substring we need a way to efficiently encode it in the same alphabet \(A\).

Let’s call the maximum length of string matches we will consider \(L_s\); then we can simply write it as a number in \(\lceil \log_\alpha L_s \rceil\) digits. So for example if we use an alphabet with \(\alpha=3\) symbols and maximum length \(L_s=9\) we need 2 ternary digits to write the length.

from math import log, ceil
α = 3
L_s = 9

digits_length = ceil(log(L_s, α))
digits_length
2

For example a length of 7 would be written as 21 as it is 2*3 + 1 = 7. We can write a little code to do the conversion:

def num_to_string(number: int, base: int, digits: int):
    assert base > 1 and isinstance(base, int)
    assert number >= 0 and isinstance(number, int)
    if base > 10:
        raise ValueError("Base larger than 10 is not supported")

    number_string = ''
    while number > 0:
        number_string = str(number % base) + number_string
        number = number // base

    if len(number_string) > digits:
        raise ValueError("Number is too large")
    
    return number_string.zfill(digits)
    
num_to_string(7,3,2)
'21'

We can invert this with Python’s built in int function:

int('21', 3)
7

We can check this holds true that numbers encoded with num_to_string can be converted back again:

@given(st.integers(0, 10**10-1), st.integers(2, 10), st.integers(1, 10))
def test_num_to_str_hasinverse_int(number, base, digits):
    try:
        number_str = num_to_string(number, base, digits)
        assert int(number_str, base=base) == number
    # If we didn't have enough digits
    except ValueError:
        pass

test_num_to_str_hasinverse_int()

Or conversely if we have a string in some base:

@st.composite
def nonempty_string_from_number_alphabet(draw):
    base = draw(st.integers(2, 10))
    alphabet = list(map(str, range(base)))
    string = draw(st.text(alphabet=alphabet, min_size=1))
    return (string, base)

[nonempty_string_from_number_alphabet().example() for _ in range(5)]
[('65521205337354853', 9), ('7219', 10), ('44434', 5), ('3222', 9), ('00', 2)]

We can convert it to a number and back again:

@given(nonempty_string_from_number_alphabet())
def test_num_to_str_isinverse_int(string_base):
    string, base = string_base
    digits = len(string)
    number = int(string, base)
    assert num_to_string(number, base=base, digits=digits) == string

test_num_to_str_isinverse_int()

We also need to store the pointer; if we keep track of the size of the entire buffer \(n\) (the amount of memory we need to allocate), then there are \(n-L_s\) possible values for the pointer, which can be stored as a number in \(\lceil \log_\alpha (n - L_s) \rceil\) bits. For example if we use \(n=18\) then we have \(9\) possible values for the pointer which can also be stored in 2 bits.

n = 18

digits_pointer = ceil(log(n - L_s, α))
digits_pointer
2

Finally for encoding we’ll store one extra digit of information: the next symbol after the substring. This ensures the algorithm never gets stuck - it always advances at least one symbol.

So the total code length is:

L_c = digits_pointer + digits_length + 1
L_c
5

Note that this logarithmic relationship is why this coding works: in 5 digits we can store strings of length \(1\) up to \(\alpha^2 + 1=10\). As long as the average length of the stored strings is greater than 5 we get compression.

The algorithm to encode a symbol is simply to write the pointer, then the length, and then the next character:

def encode_symbol(pointer: int, length: int, next_char: str,
                  base: int = α,
                  digits_pointer: int = digits_pointer,
                  digits_length: int = digits_length,
                 ) -> str:
    return (
        num_to_string(pointer, base, digits_pointer) +
        num_to_string(length, base, digits_length) +
        next_char
    )

Let’s take the example from the last section and pad it with 4 2s on the right so we have up to 9 characters to encode, and with 6 2s on the left so we have a total of n=18 characters:

S = '222222001' '010112222'

Then our pointer is at index 7 and still of length 4 (the reproducible extension is 0101):

p, L = pointer(S, j=L_s)
(p, L)
(7, 4)

The next token to encode is the next character outside of the extension

next_char = S[L_s + L]
next_char
'1'

Then we can encode the symbol:

code_symbol = encode_symbol(p, L, next_char)
code_symbol
'21111'

Decoding the symbol

We can easily invert this to decode the symbol to get back the pointer, length and next char:

def decode_symbol(code: str, base: int = α, 
                  digits_pointer: int = digits_pointer,
                  digits_length: int = digits_length,
                 ) -> Tuple[int, int, str]:
    assert len(code) == digits_pointer + digits_length + 1
    return (
        int(code[:digits_pointer], base),
        int(code[digits_pointer:digits_pointer+digits_length], base),
        code[-1]
    )

decode_symbol(code_symbol)
(7, 4, '1')

Let’s check this always is an inverse: first we need to generate a valid encoded string:

@st.composite
def code_with_lengths_and_base(draw):
    base = draw(st.integers(2, 10))
    alphabet = list(map(str, range(base)))

    # Need at least 2 characters (otherwise it's 0 digits...)
    L_s = draw(st.integers(2))
    n = draw(st.integers(L_s+2))

    digits_length = ceil(log(L_s, base))
    digits_pointer = ceil(log(n - L_s, base))

    L_c = digits_length + digits_pointer + 1

    code = draw(st.text(alphabet=alphabet,
                        min_size=L_c,
                        max_size=L_c))

    return dict(code=code,
                digits_pointer=digits_pointer,
                digits_length=digits_length,
                base=base)

[code_with_lengths_and_base().example() for _ in range(5)]
[{'code': '2401411420532', 'digits_pointer': 6, 'digits_length': 6, 'base': 6},
 {'code': '30352013205330524201510242311352113045243241453120341',
  'digits_pointer': 49,
  'digits_length': 3,
  'base': 6},
 {'code': '300120233323', 'digits_pointer': 7, 'digits_length': 4, 'base': 4},
 {'code': '110100001000001100000000000000000000000',
  'digits_pointer': 19,
  'digits_length': 19,
  'base': 3},
 {'code': '101100100001011',
  'digits_pointer': 7,
  'digits_length': 7,
  'base': 2}]

Then we can generate a code and check the inverse:

@given(code_with_lengths_and_base())
def test_code_inverse_decode(kwargs):
    code = kwargs.pop('code')
    pointer, length, next_char = decode_symbol(code, **kwargs)
    recode = encode_symbol(pointer, length, next_char, **kwargs)

    assert recode == code

test_code_inverse_decode()

The Compression Algorithm

Now we have the pieces the algorithm is straightforward: we encode the symbols, then advance along the string as many symbols as we encoded. At the start of the string we need to fill the buffer with something; Lempel and Ziv use zeros. We will follow their example of

\[ S = 001010210210212021021200\ldots\]

S = "001010210210212021021200"

We initializer our buffer with zeros (\(Z\)) and read the first \(L_s\) characters from the string

L_context = n - L_s

Z = '0' * L_context

B1 = Z
B1 += S[:L_s]
position = L_s

B1[:L_context] + ' ' + B1[L_context:]
'000000000 001010210'

We then find the pointer and length:

p, L = pointer(B1, L_context)
p, L
(8, 2)

And encode them along with the next character

next_char = B1[L_context + L]
next_char
'1'

We store this in our code:

C1 = encode_symbol(p, L, next_char)
C1
'22021'

And advance the buffer \(L+1\) spaces

B2 = B1[L+1:] + S[position:position+L+1]
position+=L+1

B2[:L_context] + ' ' + B2[L_context:]
'000000001 010210210'

We now repeat the same procedure to find the pointer and reproducible extension:

p, L = pointer(B2, L_context)
B2[p:p+L]
'010'

And read the next character

next_char = B2[L_context + L]

p, L, next_char
(7, 3, '2')

And add it to our code

C2 = encode_symbol(p, L, next_char)
C2
'21102'

This process repeats:

B3 = B2[L+1:] + S[position:position+L+1]
position+=L+1

B3[:L_context] + ' ' + B3[L_context:]
'000010102 102102120'
p, L = pointer(B3, L_context)
next_char = B3[L_context + L]

C3 = encode_symbol(p, L, next_char)

C3
'20212'

And continues:

B4 = B3[L+1:] + S[position:position+L+1]
position+=L+1

B4[:L_context] + ' ' + B4[L_context:]
'210210212 021021200'
p, L = pointer(B4, L_context)
next_char = B4[L_context + L]

C4 = encode_symbol(p, L, next_char)

C4
'02220'

And finishes when we’ve consumed the whole string; we don’t have anything left to encode

B5 = B4[L+1:] +  S[position:position+L+1]

B5[:L_context] + ' ' + B5[L_context:]
'021021200 '

So our final code is:

code = C1 + C2 + C3 + C4
code
'22021211022021202220'

And our compression ratio is

f'{len(code) / len(S):0.2%}'
'83.33%'

Let’s write this process as a function:

from typing import Iterator

def compress(S: str, 
             base: int = α, 
             L_s: int = L_s,
             n: int = n,
            ) -> str:
    code = ''
    
    L_context = n - L_s
    
    digits_pointer = ceil(log(L_context, base))
    digits_length = ceil(log(L_s, base))

    # initialisation
    buffer = '0' * L_context
    buffer += S[:L_s]
    position = L_s
    
    while len(buffer) > L_context:
        p, L = pointer(buffer, L_context)
        # Don't extend past the end of the tex
        L = min(L, L_context - 1)

        # Don't extend to the end of the text
        if L_context + L == len(buffer):
            L = L - 1
        
        next_char = buffer[L_context + L]
        code += encode_symbol(p, L, next_char,
                              base=base,
                              digits_pointer=digits_pointer,
                              digits_length=digits_length)

        buffer = buffer[L+1:] + S[position:position+L+1]
        position += L+1

    return code

assert compress(S) == code

Decompression

To decode the string we apply the process in reverse. We start with the same initial buffer of zeros

buffer = '0' * (n - L_s)
decode = ''
buffer
'000000000'

We then read in the first codeword

C1
'22021'

Decode it into a pointer, length, and next symbol

p, L, next_char = decode_symbol(C1)
p, L, next_char
(8, 2, '1')

Starting at index 8 we only have one character:

buffer[p:p+L]
'0'

We need to cyclically extend it to get the whole string:

reproducible_extension = ''
while len(reproducible_extension) < L:
    reproducible_extension += buffer[p:p+L]
    
reproducible_extension = reproducible_extension[:L]
reproducible_extension
'00'

Now we can append the next character to this to decode our first set of symbols:

decode += reproducible_extension + next_char
decode
'001'

And the decoded string gets shifted into the end of the buffer:

buffer = buffer[len(decode):] + decode
buffer
'000000001'

Now we proceed to the next code word:

C2
'21102'

As before we get out the pointer, length and next character:

p, L, next_char = decode_symbol(C2)
p, L, next_char
(7, 3, '2')

And read out the reproducible extension:

reproducible_extension = ''
while len(reproducible_extension) < L:
    reproducible_extension += buffer[p:p+L]
    
reproducible_extension = reproducible_extension[:L]
reproducible_extension
'010'

And update our code string

decode += reproducible_extension + next_char
decode
'0010102'

And buffer:

buffer = buffer[len(decode):] + decode
buffer
'010010102'

This continues until we have no codewords left.

We can put this in a loop:

def decompress(C: str,
               base: int = α, 
               L_s: int = L_s,
               n: int = n,
            ) -> str:
    output = ''
    
    L_context = n - L_s
    
    digits_pointer = ceil(log(L_context, base))
    digits_length = ceil(log(L_s, base))

    L_c = digits_pointer + digits_length + 1

    index = 0
    buffer = '0' * (n - L_s)
    while index < len(C):
        code_symbol = C[index:index + L_c]
        index += L_c
        
        p, L, next_char = decode_symbol(code_symbol,
                                        base=base,
                                        digits_pointer=digits_pointer,
                                        digits_length=digits_length)

        reproducible_extension = ''
        # Cyclical extension
        while len(reproducible_extension) < L:
            reproducible_extension += buffer[p:p+L]
        reproducible_extension = reproducible_extension[:L]

        decode = reproducible_extension + next_char

        buffer = buffer[len(decode):] + decode
        output += decode

    assert index == len(C), "Unexpected code length"

    return output

Let’s check this recovers our original text:

assert decompress(code) == S

And this is more generally true for any code:

@given(nonempty_string_from_number_alphabet(), st.integers(2, 100), st.integers(2, 100))
def test_compress_decompress(s_base, L_s, pointer_size):
    s, base = s_base
    n = L_s + pointer_size
    
    c = compress(s, base, L_s, n)
    
    assert decompress(c, base, L_s, n) == s

test_compress_decompress()

Making LZ77 compression more effective

The algorithm I wrote above works but you wouldn’t use it in practice. Aside from representing bits as strings, the encoding itself has a lot of redundancy and the algorithm to find the longest prefix is very slow. This section will briefly outline a few of these points but there are surely a lot more tricks in production libraries like zlib.

Reduncancy in the codes

Note that not all codes are reachable from compress, and that means the code is not as efficient as it could be.

For example when the length is zero all pointers would all decode to the same string.

decompress('00000'), decompress('22000')
('0', '0')
decompress('22001' '21021' '22001'), decompress('22001' '21021' '00001')
('10111', '10111')

We could use these extra characters to store more information, for example we could store in total digits_pointer + 1 next characters when the length is 0. In fact if the length is less than digits_pointer + 1 then just storing the characters in the pointer is more efficient than using the length encoding.

Howver there may be cases when advancing only 1 or 2 symbols puts us in a better place to find a longer reproducible extension after; these could also be considered for a non-greedy variation.

Choosing the initial buffer

Similarly the first code word is guaranteed to have the maximum pointer; with length 0 if the first symbol is not a 0, or otherwise with positive length if it is.

from itertools import product

all_first_codes = map(''.join, product(*[A]*L_c))

possible_first_codes = [code for code in all_first_codes if compress(decompress(code)) == code]

ncol = 9
for i in range(0, len(possible_first_codes), ncol):
    row = possible_first_codes[i:i+ncol]
    print(("{} " * len(row)).format(*row))
22000 22001 22002 22010 22011 22012 22020 22021 22022 
22100 22101 22102 22110 22111 22112 22120 22121 22122 
22200 22201 22202 22210 22211 22212 22220 22221 22222 

We could make this more efficient by storing additional characters in the first entry, or alternatively by starting with a richer initial buffer (or mapping as described in the previous seciond) that has a greater range of characters.

Compression of constrained sources

The algorithm itself is just Section 2 of Lempel and Ziv’s A Universal Algorithm for Sequential Data Compression, the rest of the paper tries to put some estimates on how good a compressor it is. The problem is their proofs of worst case compression only work in really uninteresting cases; when there are pairs of letters that never occur in the same string. This is not at all useful!

This section is going to work through in detail why the proofs aren’t interesting; if you just want to compress some text you can skip ahead to the next section where we apply it on real data.

Sources

I once heard a story about a group of mathematical researchers that spent years studying a class of objects defined by some axioms before they realised the class was empty; no object satisfied all those properties. I have no idea of the veracity of the story, but there is a kernel of truth that it’s not always clear from a definition what the concrete objects that satisfy it are. It pays to try to at least work through some examples.

We start with an alphabet \(A = \{0, 1, \ldots, \alpha-1\}\) and the set of all possible strings with letters from that alphabet, denoted using the Kleene Star \(A^*\). In the paper a source \(\sigma\) is a subset of these strings that satisfies three properties:

  1. It contains all the length 1 strings; or in mathematical notation \(a \in A\) implies \(a \in \sigma\)
  2. Any string from the source concatenated with itself is in the source; \(S \in \sigma\) implies \(SS \in \sigma\)
  3. Any substring of a string from the source is also in the source; \(STU \in \sigma\) implies \(T \in \sigma\)

I claim that a source is just a union of all strings over different alphabets; that is for any source \(\sigma\) over \(A\) there exists sub-alphabets \(A_1, A_2, \ldots A_k\) such that \(\sigma = A_1^* \cup A_2^* \cdots \cup A_k^*\) (here \(\cup\) means union of sets, but you can also think of it as alternation | in regular expressions). This isn’t very practically interesting; many files could contain every possible valid character \(A^\star\), and in this case their proofs reduce to no compression. If we really did have this kind of source a simple compression would be to record the first character and then switch to a different encoding depending on which \(A_i\) we are in.

Simplest possible source

Let’s consider the simplest possible source. Let’s start with property 1:

A = ['0', '1']

source = set(A)
source
{'0', '1'}

We extend it with all concetenations to get all runs of ‘0’ and ‘1’ to length 2:

concatenations = {s + s for s in source}
    
source.update(concatenations)
source
{'0', '00', '1', '11'}

And then concatenations of concatenations to get runs of length 1,2 and 4

concatenations = {s + s for s in source}
    
source.update(concatenations)
source
{'0', '00', '0000', '1', '11', '1111'}

Repeating we would get runs of ‘0’ and ‘1’ of length 1, 2, 4, 8, 16, …

But we can also apply property 3, taking substrings to get other lengths

substrings = {s[i:j]
              for s in source
              for i in range(len(s))
              for j in range(i, len(s))
             }

substrings
{'', '0', '00', '000', '1', '11', '111'}
source.update(substrings)
source
{'', '0', '00', '000', '0000', '1', '11', '111', '1111'}

We can write this formally by using exponent notation for concatentation, so \(S^2 = SS\). Our lemma is if \(S \in \sigma\) then \(S^k \in \sigma\) for \(k=1, 2, \ldots\). We can prove this by induction; if \(S^k \in \sigma\) then by the concatenation property \(S^k S^k = S^{2k} \in \sigma\). Since a substring of a source string is also in the source, we can take the substring of \(k\) times the length of \(S\) to get \(S^{k+1} \in \sigma\).

So then by property 1 the smallest possible source is \(\sigma = 0^* \sqcup 1^* \sqcup \cdots (\alpha-1)^*\). This is indeed a source since:

  1. It contains all the length 1 strings in \(A\)
  2. If \(S \in \sigma\) then \(S = a^k\) for some \(a \in A\) and \(SS = a^{2k} \in \sigma\)
  3. If \(STU \in \sigma\) then \(STU = a^k\) and so \(T = a^l\) for some \(l \leq k\), and \(T \in \sigma\)

Second smallest source

So that’s the smallest source, what’s the next largest source over a binary alphabet? This must be by adding \(01\); by symmetry adding \(10\) would give the same sized alphabet, and any longer string that is not \(0^n\) or \(1^n\) must contain a \(01\) or \(10\) somewhere, and so by the substring rule must be at least as big.

source = {'0', '1', '01'}
sorted(source, key = lambda x: (len(x), x))
['0', '1', '01']

Let’s add self-concatenations:

concatenations = {s + s for s in source}
    
source.update(concatenations)

sorted(source, key = lambda x: (len(x), x))
['0', '1', '00', '01', '11', '0101']

Adding substrings we see we get 10 (from the middle of the self-concatenation) 0101. So we have all length 2 strings.

substrings = {s[i:j]
              for s in source
              for i in range(len(s))
              for j in range(i, len(s))
             }

source.update(substrings)

sorted(source, key = lambda x: (len(x), x))
['', '0', '1', '00', '01', '10', '11', '010', '0101']

We can apply concatenation and substrings again; the strings are getting long now so lets just keep track of the counts:

from collections import Counter

concatenations = {s + s for s in source}
    
source.update(concatenations)

substrings = {s[i:j]
              for s in source
              for i in range(len(s))
              for j in range(i, len(s))
             }

source.update(substrings)

sorted(Counter(map(len, source)).items())
[(0, 1), (1, 2), (2, 4), (3, 6), (4, 6), (5, 3), (6, 3), (7, 1), (8, 1)]

If we apply it again we get all \(2^3 = 8\) strings of length 3.

concatenations = {s + s for s in source}
    
source.update(concatenations)

substrings = {s[i:j]
              for s in source
              for i in range(len(s))
              for j in range(i, len(s))
             }

source.update(substrings)

sorted(Counter(map(len, source)).items())
[(0, 1),
 (1, 2),
 (2, 4),
 (3, 8),
 (4, 14),
 (5, 19),
 (6, 19),
 (7, 19),
 (8, 18),
 (9, 12),
 (10, 10),
 (11, 6),
 (12, 5),
 (13, 3),
 (14, 3),
 (15, 1),
 (16, 1)]

If we apply it again we get all \(2^4 = 16\) strings of length 4.

concatenations = {s + s for s in source}
    
source.update(concatenations)

substrings = {s[i:j]
              for s in source
              for i in range(len(s))
              for j in range(i, len(s))
             }

source.update(substrings)

Counter(map(len, source))[4]
16

It looks like we’re getting all strings, \(\{0,1\}^*\). What’s going on here?

All possible sources

Sequentially applying concatenation and substring is very powerful. For example we can cycle indices: if \(a_1 a_2 \ldots a_{n-1} a_n \in \sigma\) then \(a_{n} a_1 a_2 \ldots a_{n-1} \in \sigma\) \[\begin{align} a_1 a_2 \ldots a_{n-1} a_n & \in \sigma & \text{(Assumption)} \\ a_1 a_2 \ldots a_{n-1} a_n a_1 a_2 \ldots a_{n-1} a_n & \in \sigma & \text{(Concatenation)} \\ a_n a_1 a_2 \ldots a_{n-1} & \in \sigma & \text{(Substring index n, length n)} \end{align}\]

Note that this means we can delete any substring of a source to get another source; we simply cycle the item to be deleted to the end, take the substring excluding the end item, and then cycle back to the original position.

We can also make a copy of any substring of a source and prepend it to get a new substring: \[\begin{align} a_1 \ldots a_i \ldots a_n & \in \sigma & \text{(Assumption)} \\ a_1 \ldots a_i \ldots a_n a_1 \ldots a_i \ldots a_n & \in \sigma & \text{(Concatenation)} \\ a_1 \ldots a_i \ldots a_n a_1 \ldots a_i & \in \sigma & \text{(Substring index 1, length n+i)} \\ a_1 \ldots a_i \ldots a_n a_1 \ldots a_i a_1 \ldots a_i \ldots a_n a_1 \ldots a_i & \in \sigma & \text{(Concatenation)} \\ a_i a_1 \ldots a_i \ldots a_n & \in \sigma & \text{(Substring index n+i, length n+1)} \end{align}\]

These properties together mean we can swap any adjacent pair of indices. By the cycling property we only need to prove we can swap the last two indices: \[\begin{align} a_1 \ldots a_{n-2} a_{n-1} a_n & \in \sigma & \text{(Assumption)} \\ a_{n-1} a_1 \ldots a_{n-2} a_{n-1} a_n & \in \sigma & \text{(Prepending)} \\ a_{n} a_{n-1} a_1 \ldots a_{n-2} a_{n-1} a_n & \in \sigma & \text{(Prepending)} \\ a_{n} a_{n-1} a_1 \ldots a_{n-2} & \in \sigma & \text{(Substring index 1, length n)} \\ a_1 \ldots a_{n-2} a_{n} a_{n-1} & \in \sigma & \text{(Cycling indices)} \\ \end{align}\] Swapping adjacent indices generates all permutations, so any permutation of a source is also a source.

Since we can arbitrarily prepend, delete and permute any substrings this generates all possible strings over the distinct letters in \(S\). So if \(S = a_1 \ldots a_n \in \sigma\) with \(a_i \in A\) then \(\{a_1, \ldots, a_n \}^* \in \sigma\).

This is just what we set out to prove; any source can be decomposed as strings over sub-alphabets \(\sigma = A_1^* \cup A_2^* \cdots \cup A_k^*\). So for example in the ternary case the possible sources are

  • \(0^* \cup 1^* \cup 2^*\)
  • \((0 \vert 1)^* \cup 2^*\)
  • \((0 \vert 2)^* \cup 1^*\)
  • \((1 \vert 2)^* \cup 0^*\)
  • \((0 \vert 1)^* \cup (0\vert 2)^*\)
  • \((0 \vert 1)^* \cup (1\vert 2)^*\)
  • \((0 \vert 2)^* \cup (1\vert 2)^*\)
  • \((0 \vert 1 \vert 2)^*\)

Bounds and h-numbers

If you’re still following along with the original paper there’s some talk about \(h\) numbers, \[ h(m) = \frac{1}{m} \log_{\alpha} \lvert\{s \in \sigma \vert \operatorname{length}(s) = m\}\rvert \]

The only practically interesting sources are \(\sigma = A^*\) in which case \(h(m) = 1\), and their worst case compression bound of \(\rho \leq h (L_c - 1) + \epsilon(L_s) = (L_s - 1) + \epsilon(L_s)\) is very unimpressive. A simple argument that each encoded string contains at least 1 character (the next character) gives a worst case compression ratio of \(\rho \leq (L_c - 1)\).

The real way to tell how well it actually works is to use it in applications; looking at the Squash benchmark methods based on LZ77 such as zlib compress a wide variety of sources and can process the data relatively fast (although there are other algorithms that compress data much more at a cost of being much slower).