Let’s tame data streams with Python

Information is currently gradually becoming “new oil” in terms of value. The only problem is that the volumes of data to be processed are growing by leaps and bounds. The sizes of files are sometimes larger than the hard drive, not to mention that RAM can’t cope, and interviewees receive increasingly scary tasks like comparing two petabyte files on the fly. But, fortunately for programmers, there is no need to make the machine choke on such amount of information, as iterators and generators can be used for threading, and there is also Python, a programming language which supports them perfectly. Would you like me to tell you about that?

Data flows

It just so happened that in the Russian language the word “flow” (potok) in respect of programming has many senses. The same Russian word is used for a thread, a stream or a flow. This time we won’t talk about threads, and we’ll discuss streams and flows instead, in particular, input and output streams and data flows. So further on I am going to use the word “flow” quite often, in this very sense.

Generally, in the context of technology, we often have to analyze or process on the fly such enormous data flows. And it happens increasingly more often, because the modern world generates tremendous amounts of information. They are so large that entire computing clusters are built for their storage and processing. Yes, the same stdin, stdout and stderr which you know from school tirelessly chase bits and bytes back and forth.

If we move on from the lyrical part and come down to the ground, then the simplest examples of data streams can be network traffic, the signal of a sensor or, say, stock quotes in the real time mode. And there already exists a whole class of “flow” algorithms for various problems. When you type in the command line something like “cat file.txt | sed s/foo/bar/g”, what happens next is precisely the manipulation of the data stream, which is transmitted by the “assembly line” in the form of a vertical bar from the stdout of the cat command to stdin of sed command.

And so we gradually come to the concept of iterator and the so-called Data Flow Programming. In order to somehow cope with this continuous stream of incoming information, we have to cut it into pieces and try to do something about these very pieces. And this is here that the iterator emerges.

An iterator is an abstract object which allows you to take one element after another from the source, which can be stdin or some large container, and the iterator knows only about the object which it is currently dealing with. In terms of C/C ++, for example, this can be imagined as a pointer that moves along in the container. For these languages, I recommend to see [Andrei Aleksandresku presentation] (slidesha.re/1rHhfm7).

Going through the elements using the iterator

And now let’s get closer to Python. You probably believe that the previous paragraphs sound lecturing and boring, but please don’t worry: we’ll get to the code pretty soon.

First of all, let’s get the terms sorted out. In Python (and elsewhere as well), there are two concepts that sound almost the same but refer to different things: iterator and iterable. The first is an object that implements the interface described above, and the second is a container which can be a source of data for the iterator.

Surely you remember that to place an example of a class somewhere to “for”, the class must implement two methods — iter () and next () (in the third Python, it is next ()). Indeed, the list can be used for iteration, but the list itself does not keep track of where we stopped while we were going through it. And what controls the process is the object called listiterator, which returns by iter () and is used in, a for loop or map () call, for instance. When there are no objects left in the collection that it was going through, a StopIteration exception is activated.

In many cases, unless I comment otherwise, I will try to write code that is compatible with both Python 2 and Python 3. I will do this with the help of [six] (bit.ly/1lfBzXR) module.

from __future__ import print_function

# Six represents the parent class,
# that allows you not to worry about what
# next-method should be declared in an object —
# it is always __next__()
from six import Iterator

class MyIterator(Iterator):
    def __init__(self, step=5):
        self.step = step
    def __iter__(self):
        return self
    def __next__(self):
        self.step -= 1
        # The condition for the stop of the iterator
        # so that it does not run forever
        if not self.step:
            raise StopIteration()
        return self.step

myiterator = MyIterator()

for item in myiterator:
    print(item, end=" ")
print()

"""
4 3 2 1
"""

Now, this is really very simple. MyIterator class provides an interface for elements sorting (or rather, generation) which is described above and activates an exception when the value of the current step reaches zero. I recommend to take notice of the “laziness” factor: the iterator starts to do something only when it is gently asked for it by “for” (i.e., the transition to each following iteration).

Syntactic sugar

But is it worth it to create more classes, when all that we need is to sort out items in the collection? Let us remember such a thing as list comprehensions.

with open("file.txt", "r") as f:
    mylist = [l for l in f if "foo" in l]

for item in mylist:
    print(item)

"""
Here is the output from file.txt for all the rows that have a "foo" substring.
"""

That’s not bad. Three and a half lines of code (they can easily be condensed into just two, but I want to illustrate the idea) create the list, and, as we know, it already has the iterator ready. But there is one small problem. Or rather a large one, we must admit. More precisely, it all depends on how many elements we have in our collection (in this case, the lines with “foo” substring in the file, and, therefore, in the list). The reader has not forgotten that a list in Python is a whole chunk of memory, like an array in C, or have you? Moreover, when an item is added at the end (and this is what happens), there is a chance that the new list will be too big for the chunk of memory allocated for it, and the interpreter will have to beg a new chunk from the system, and even duplicate all the existing elements for O(n ) there. What if the file is large and there are many strings that comply with our requirements? To sum up, this code should not be used. So what should we do?

with open("file.txt", "r") as f:
    mylist = (l for l in f if "foo" in l)

for item in mylist:
    print(item)

"""
Here we also have the output from file.txt for all the rows that have a "foo" substring.
"""

To tell you the truth, I was astonished when I looked through the code in several major open source projects and found out that the developers are very fond of list comprehensions but completely forget about generating expressions. To me it looks like using range () instead of xrange () in Python 2 only to go through the numbers in order, forgetting that it grabs a huge part of memory to store the full array of the results it gets.

Let’s generate useful things

So what is a generator expression and what is a generator as such? To put it simple, generator expression is another syntactic sugar in Python, the easiest way to create an object with an iterator interface without downloading all the items into memory (which is often not necessary). As for a generator as a concept…

If we speak about functions, for example, everybody knows about them, except the one who has received his or her first book on programming as a birthday present today. If he/she got his/her book yesterday, he/she is likely to know this. But functions have a strictly determined behavior: they have one entry point and one return value (the fact that in Python you can write “return a, b” is not a multiple return in the precise meaning of the term. It’s just the return of a cortege). And what if I say that the generator is also a function, only with multiple entry and exit points?

The main feature of the generator is that it, like the iterator, remembers the last time when it was called, but instead of abstract elements it operates quite specific blocks of code. That is, if, by default, the iterator iterates the elements in the container until there are none left, the generator will drive the code until some specific condition for the return is satisfied. Yes, by and large the code provided in the first section is a generator with an iterator interface which has a well-defined exit condition. If you expand the generator expression from the code above into a full generator function, you get something like this:

def my_generator(step=4):
    with open("file.txt", "r") as f:
        for l in f:
            if "foo" in l:
                yield l

myiterator = my_generator()

for item in myiterator:
    print(item)

"""
And here, too (that's a surprise), we have the output from all strings of file.txt where there is a "foo" substring.
"""

The keyword yield is just a separator of the blocks of code that the generator executes each time it is called, that is, at each iteration. Actually, the loop is not required, you can simply write some pieces of code in a function and divide them by the yield operator. The interface will remain exactly the same, iterator-like.

def my_generator2(step=4):
    print("First block")
    yield 1
    print("Second block")
    yield 2

myiterator = my_generator2()
myiterator.next()
# "First block"
myiterator.next()
# "Second block"
myiterator.next()
# Traceback: StopIteration

Such a smart yield

So we now understand what multiple exit points are about. But what about multiple entry points? Is next () the only thing we can do with the generator? Actually, it isn’t.

Since the days of the good old Python 2.5, the object of generator has obtained several more methods: .close(), .throw() and .send(). And this led, so to say, to a revolution in the field of Data Flow Programming.

With .close () we can now make the generator stop when it is called next time, and with the help of .throw (), we make it throw an exception:

from __future__ import print_function

import itertools

def my_generator3():
    # Infinite counter
    counter = itertools.count()
    while True:
        yield next(counter)

myiterator = my_generator3()
myiterator2 = my_generator3()

for item in myiterator:
    print(item, end=" ")
    if item == 3:
        # Let's close the generator in the right way
        myiterator.close()
print()

for item in myiterator2:
    print(item, end=" ")
    if item == 2:
        # Everything's terrible
        myiterator2.throw(Exception("Everything is bad"))
print()

"""
0 1 2 3
0 1 2 Traceback (most recent call last):
  File "test.py", line 28, in <module>
    myiterator2.throw(Exception("Everything is bad"))
  File "test.py", line 12, in my_generator3
    yield next(counter)
Exception: Everything is bad
"""

And the nicest thing, as it turns out, is hidden in the .send () method. It allows you to send data to the generator before calling the next block of the code!

from __future__ import print_function

import itertools

def my_coroutine():
    # Infinite counter
    counter = itertools.count()
    while True:
        y = (yield next(counter))
        if y:
            # Let's change the initial point of the counting
            counter = itertools.count(y)

myiterator = my_coroutine()

for item in myiterator:
    print(item, end=" ")
    if item == 1:
        # Let's send the number to the generator
        myiterator.send(4)
    elif item == 6:
        myiterator.close()
print()

"""
0 1 5 6
"""

It’s not by a mere chance that I call the generator here a co-routine. A co-routine is just the very thing which programmers whisper about in the offices as they discuss gevent, tornado and other eventlet things. You can find more in Wikipedia, and I think I’ll just say that co-routines in this form are most often used in Python for data flow analysis in the implementation of cooperative multitasking. The fact is that when we call yield (as well as return, in most cases), a transfer of control takes place. The co-routine itself decides when to redirect the flow to another location (for example, to another co-routine). And it allows you to build beautiful branched trees for data streams processing, implement MapReduce, and, probably, send the traffic of bytes going through the socket to another node. Moreover, co-routines can actually be implemented in any language, as well as command-line utilities in Linux, which I gave as an example at the beginning of the article.

A tiny practical example

A person without special training will find it quite difficult to read the code written with the use of the co-routine approach. It looks like a set of asymmetric pinions which rotate each other each in their turn, maintaining their last status.

But all this pinion system has one very nice feature: any such pinion can be put to a new place or replaced by another one which will do a better job there. Here I’m talking about the fact that in case of Python, all you need to know about the object is that it provides an external interface of the iterator that can be understood by the interpreter, and it does not really matter what language it is written in and what it does in itself.

Let me give you an example from a presentation by David Beasley (see Box). Here is a variant of Apache log:

23.34.139.80 - ... "GET /categories/ HTTP/1.1" 200 6394
23.34.139.80 - ... "GET /favicon.ico HTTP/1.1" 404 807
23.34.139.80 - ... "GET /static/img/logo.gif HTTP/1.1" 200 41526
23.34.139.80 - ... "GET /news/story.html HTTP/1.1" 200 6223
23.34.139.80 - ... "GET /about/example.html HTTP/1.1" 200 1223
42.77.100.21 - ... "GET /index.html HTTP/1.1" 200 7774

Let’s try to sum up the last column and see how many bytes were delivered over the network. You may ask how to write that as a usual cycle, and I’ll assign that issue to you as your home task. And we will try to describe it in generator expressions.

wwwlog = open("access-log")
bytecolumn = (line.rsplit(None,1)[1] for line in wwwlog)
bytes = (int(x) for x in bytecolumn if x != '-')
print "Total", sum(bytes)

It is pretty obvious what each line is doing here, isn’t it? If we need to filter out some specific lines or count something along the way, we can integrate everything into a single pipeline simply by adding a few lines to the generators in the right places, like pinions.

What do you have to do about all that

Understanding how iterators and generators in programming languages work is one of the first steps towards mastering sequential processing of huge data flows, and it is an area to which, for example, trading and technical analysis belong, that is, the things that allow many people today to make a fortune.

But even if you are not too ambitious about the result, your script will simply consume several times less resources. Python interpreter, though it tries not to duplicate the data, is not free to make its own decisions and has to form the whole list in its memory if the developer wrote it that way.

Anyway, I wish you to make quick and nice codes. See you again!


One Response to “Let’s tame data streams with Python”

  1. sayantan

    That alternate to list comprehension in “Syntactic sugar” part will give I/O error as a context manger:”with” has been used and it will close the file after generator object is built. So further using the generator object will try to access a close file…
    Alternate approach please ?

Leave a Reply to sayantan

Click here to cancel reply.

XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>