Nathan Wiegand

Ramblings of a mad engineer

Taking MapReduce to Monte Carlo

May 5, 2010

MapReduce is one of those simple ideas that you look back on and say, well damn, I could have thought of that. MapReduce is a simple technology that allows programmers to write two simple functions, a mapper and a reducer, and have them scale to attack very large problems. I was recently needing to run some MonteCarlo experiments involving various graph algorithms over random graphs. Luckily I had access to some compute time on a very large MapReduce system. I haven’t seen much discussion on putting them together. So, this article describes how to use MapReduce as a tool for MonteCarlo simulations.

MapReduce

Most of you already know a bit about them, but just a reminder. MapReduce jobs consist of two main phases: Map and Reduce. To begin, some large body of data is broken into manageable pieces (say, one line of text from a huge file, or a single HTML file from the entire internet). Each of these pieces is handed to a Mapper. The Mapper takes this piece of the larger data and emits key-value pairs.

Before theses key-value pairs are passed to the Reducers, they are sorted and the keys are aggregated. A reduce instance will get all of the values associated with a single key. It will then compute some new value from this, emitting an answer (also as a key-value pair).

One of the canonical examples of how this works is to calculate word-frequencies from some very large body of text. What follows is some Python-esque pseudo code for doing just this.

def Mapper(line):
    for x in line.split(" "):
        emit(x, 1)

This takes each word and emits it as the key with a value of one. Now, each of these are aggregated and passed to Reducer instances. So, for example, the following Reducer might be called with a list of values associated with the key “the”:

def Reducer(key, values):
    count = 0
    for v in values:
        count += v
    emit(key, count)

Now, the output after the Reducer has run is the frequency for every word occurring in the data-set.

MonteCarlo

The basic idea of a MonteCarlo experiment is that you’re trying to calculate the value of some underlying function or variable, and you do so by repeated random experiment.

For example, you can calculate pi easily using a MonteCarlo experiment. Imagine you have a dart board with radius 1 meter (big dartboard). It’s contained within a perfectly fitting square, thus it’s 2m on a side. If we throw a dart, what’s the probability that we hit the dart board, given that we hit within the square? It’s the area of the board divided by the total area of the square, so $\pi/4$.

To make the math easier, let’s just look at the upper right quadrant. Thus, $x \in[0,1], y \in[0,1]$. The area within the square in this quadrant is 1, and the area of the circle is $\pi/4$. Note that the probability of hitting the dart board is the same. We can parameterize the circle as $x^2+y^2\le1$. So, let’s set up our experiment like this.

$$ hit(x,y) = \left\{ \begin{aligned} 1 &&\text{if } x^2 + y^2 \leq 1 \\ 0 && otherwise \end{aligned} \right . $$

Now, $hit(x,y)$ is an indicator random variable. Thus, $E(hit(x,y))=p$ where $p$ is the probability of $hit(x,y)=1$, which we already stated is $\pi/4$. So what? The expected value of this function is $\pi/4$, who cares? Well, this means that if we uniformly choose numbers between 0 and 1 and pass them through $hit(x,y)$, and figure out what the experimental average number of hits is, this should converge to the expected value. In other words. Throw thousands of darts and find the ratio of hits to total throws. This will tend toward $\pi/4$.

Bringing it together

Because our random numbers are drawn with replacement, there is no real difference in running 10 experiments with 100 darts and 1 experiment with 1000 darts. But, what do we pass to the Mapper?

When we teach undergraduates about random number generators, we show them the pitfalls of not seeding a random number generator. Then, we show them the easy way to correct this, which is to just use the current time as the seed. Also here, we also want our experiment to be repeatable.

So, instead of using current machine time, let’s seed each experiment with a different number. Thus, the input to our MapReduce job is just going to be the seeds. For ours, lets just make the input be [1,…,100]. And, say that each Mapper only gets a single input. The Mapper is where the MonteCarlo experiment actually happens. Let each mapper run 100 sims (not necessary, but why not?):

import random
def Mapper(s):
    random.seed([s])
    hitCounter = 0
    for i in range(100):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            hitCounter += 1
    emit(1, hitCounter)

Of note here is that the key being emitted is 1 for all of the Mappers. This means that only a single reducer will run. The reducer need just aggregate these counts:

def Reducer(key, values):
    totalHits = 0
    for v in values:
        totalHits += v
    # each value is out of 100, to total is 100*|values|
    emit(key, totalHits / (100. * len(values))

Because there’s only only one Reducer, there’s only one answer from the whole experiment, the experimental average of the number of hits. This should converge to π/4 as the number of experiments increases.

Conclusion

Sure calculating $\pi$ like this is serious overkill. Yes, there are much more efficient methods. But the point is that this is a fairly easy technique.

I used this with Erdös-Rényi random graphs with varying edge-probabilities. So, in addition to the seed, the input to my Mappers also included the edge probability. So, the idea is that you design a single Monte Carlo experiment with paramerized variables and pass those in to the Mappers.

This made my life much, much easier. I was able to use several hundred CPU hours of experimentation but with only a few hours of wall time. With Amazon’s Elastic MapReduce instances only being $0.015 per CPU hour, this is definitely worth putting some thought into.


Coroutines in C

April 21, 2010

A while ago, I read this article on coroutines in C. One of the main downfalls of this technique is that since it uses the static keyword, it makes it much harder to write recursive coroutines, and nigh-impossible to have multiple coroutines using the same function.

Examples

Let’s first look at an example of how it can work. One way of efficiently computing Fibonacci numbers in a language like Haskell is to construct an infinite list of the Fibonacci values.

fibs a b = a : fibs b (a+b)

Since Haskell is lazily evaluated, such an infinite list is fine until you attempt to compute the whole thing. So, as long as you just say something like take 10 $ fibs 1 1 it will only compute how many you need.

One important use of Coroutines is as iterators. Consider the Haskell example from above. If we assume that we have such an infinite list of values, we may want to iterate over the list, inspecting the values as we go. The following C code achieves this using some helper macros we’ll describe below.

int fibs(int a, int b, CoroutineState *s) {
  initializeCoroutineState(s, FibState, mystate);

  mystate->a = a;
  mystate->b = b;
  yield(s,mystate->a);
  recur (s,fibs(mystate->b, mystate->a + mystate->b, s->next));

  finalizeCoroutine(s);
  return 0;
}

Now, if we wanted to print out the first 10 of these values:

s = createCoroutineState();
for(i = 0; i < 10; i++) {
  printf("%d ", fibs(1,1,s));
}

The yield method is similar to Python’s yield key word. It returns a value, but also records where the yield was called so that the next time the function is executed it can begin again from that point.

Another example of how coroutines can be used as iterators is as iterators over a binary tree. Consider the following tree:

       4
  2         6
1   3     5    7

A standard way to traverse this tree is a post-order traversal. Consider the following code which traverses this tree and prints it out:

void print(Tree *t) {
  if(t->left)
    print(t->left);
  if(t->right)
    print(t->right);
  printf("%d ", t->value);
}

Building a post-order iterator is slightly more complex. It involves creating a stack and pushing wrapped tree-nodes onto the stack. When one is popped, it is marked as ‘visited’ and then pushed onto the stack, followed by its right child then left child. However, we can use coroutines to achieve the same thing while staying true to the original traversal code:

int treePostOrderIterator(Tree *t, CoroutineState *s) {
  initializeCoroutineState(s,TreeIteratorState, mystate);
  mystate->tree = t;

  if(mystate->tree->left)
    recur (s, treePostOrderIterator(mystate->tree->left, s->next));

  if(mystate->tree->right)
    recur (s, treePostOrderIterator(mystate->tree->right, s->next));

  yield (s,mystate->tree->value);

  finalizeCoroutine(s);
  return 0;
}

How it works

When I was learning to program, I learned that switch statements were just conditional structures that were equivalent to a series of if statements. While you can almost always get away with thinking about them this way, they aren’t actually. Instead, think of switch statements as parameterized gotos. One of the less objectionable uses of goto is as a means of breaking out of nested loops. So, obviously goto can jump out of scopes. In fact, it can be used to jump into and out of any scope inside of a single function. The same is true of the cases of a switch statement. This is what makes Duff’s device possible.

In order to facilitate recursion and having multiple coroutines running at once, we need to pass a mutable state parameter. This is an object of type struct CoroutineState.

typedef struct CoroutineState CoroutineState;
struct CoroutineState {
  void *state;            /* Pointer to a struct representing the internal state
                             of the coroutine.  We need this since we have to
                             reinstate the scope whenever the function is
                             re-intered.                                      */
  long current;           /* The line number of where the coroutine should
                             return.  Used in 'yield' and 'recur'             */
  unsigned char done;     /* A marker to show that a recurrence is completed  */
  CoroutineState *next;   /* The next in the stack of states                  */
  CoroutineState *parent; /* NULL if actually the parent, pointer otherwise   */
  CoroutineState *tail;   /* Only the parent is guaranteed to know the tail   */
};

Basically this struct stores everything needed to reinstate the coroutine when it is next called. This includes the line number it should begin executing on, a pointer to the user’s saved values, and a stack containing the recursive calls.

Every coroutine begins with an invocation of the initializeCoroutineState macro. This is a macro that takes three parameters. The first is a CoroutineState object, the second is the type defining the user’s saved values, and the last is the variable name to use to refer to this state.

#define initializeCoroutineState(coroutineState, UserStateType, userStateVar) \
  UserStateType *userStateVar = (UserStateType*) coroutineState->state;       \
  switch(coroutineState->current) {                                           \
  case 0:                                                                     \
  if(coroutineState->state == 0) {                                            \
    coroutineState->state = (void*) malloc(sizeof (UserStateType));           \
    bzero(coroutineState->state, sizeof(UserStateType));                      \
  }                                                                           \
  userStateVar = (UserStateType*) coroutineState->state;                      \

Note that there is a switch statement embedded here. This switch statement jumps to the appropriate line in the execution.

Every coroutine ends with finalizeCoroutine(s) which indicates that after this point the execution of the function should proceed normally. It also says that when this function returns that the coroutine is complete and should be removed from the stack. This is indicated internally with s->done =1;.

#define finalizeCoroutine(s)                                                  \
  }                                                                           \
  s->done = 1;                                                                \
  free(s->next);                                                              \
  s->next = 0;

Now, to the meat of the matter. yield needs to return the value the user wants to yield, but also record where this occurred so that the switch statement from above will jump to here. It also needs to reinstate any variables that the user had in scope. Unfortunately there’s no good way of doing that in C, so we have to do it manually. Thus the need for the helper struct. Before the return in the following macro, we’re just manipulating the stack so that we can record where we are. Notice coroutineState->current = __LINE__;. __LINE__ is a macro that evaluates to the current line number in the program. So, here we’re recording the line number and then after the return, we have case __LINE__:. This will cause the coroutine to begin executing after the return the next time it’s called.

#define yield(coroutineState, value)                                          \
  do{                                                                         \
    CoroutineState *parent = coroutineState ? coroutineState                  \
        : coroutineState->parent;                                             \
    coroutineState->current = __LINE__;                                       \
    parent->tail = coroutineState;                                            \
    free(coroutineState->next);                                               \
    coroutineState->next = 0;                                                 \
    return value;                                                             \
    case __LINE__: 1;                                                         \
  } while(0)

The last macro is perhaps the most interesting. It’s the one that allows things like the infinite list iterator and the binary tree iterator. Basically, there are times when you want to have a recursive call and have the coroutine state stored along with having a return value bubble up.

The while loop is responsible for executing until the coroutine called below it is completed.

#define recur(coroutineState,func)                                            \
  do{                                                                         \
    CoroutineState *parent = coroutineState ? coroutineState                  \
        : coroutineState->parent;                                             \
    coroutineState->next = createCoroutineState();                            \
    coroutineState->next->parent = parent;                                    \
    parent->tail = coroutineState->next;                                      \
    while(!coroutineState->next->done) {                                      \
      CoroutineState *parent = coroutineState ? coroutineState                \
          : coroutineState->parent;                                           \
      typeof(func) tmp =  func;                                               \
      coroutineState->current = __LINE__;                                     \
      if(!parent->tail->done)                                                 \
        return tmp;                                                           \
      case __LINE__: 1;                                                       \
    }                                                                         \
  } while(0)                                                                  \

Using it

In order to get this to work, you have to do one extra thing. You need to create a struct which you use to store your state rather than use local-variables in your functions. This is necessary since we’ll be re-entering the functions later and that state would be lost. So, just put all the variables you would want to use into the struct. Then use this struct in the initializeCoroutineState and then whenever you use a local variable.

If you would normally return a computed value (not the result of a recursive call) then just say yield (value); If you are doing a recursive call and intend for the coroutine to proceed down the recurrence, just wrap it in a recur(s, yourRecursiveCall()).

Get it

I’ve posted the implementation as a library on GitHub: http://github.com/nathanwiegand/coroutines.


Hotswapping Binaries

February 22, 2010

About a year ago I was having a discussion with my friend Crutcher when he suggested that one could hot-swap versions of a running program. This post describes my implementation of just such a thing.

Why would you hot-swap? One of the major benefits of hotswapping is that the new version of the program will have access to all of the old version’s file-descriptors. This means that any files, sockets, or pipes that the previous version currently had open can still be open. For example, if one was careful, you could hotswap an application that was in the middle of serving a very large file to a user without him being aware that anything happened.

Before we begin discussing how this should work, let’s look at some of the problems. Sure we get file descriptors, but what about all of our state? Well, this is a problem. We cannot easily take our state with us. Since we’re updating versions here, you want to be particularly sure you only take state that you need. To do this, I would recommend using a serialization library for C. A bit of Googling showed me this one, TPL, though I haven’t tried it yet. For our example here, we’ll just manually move the only pieces of state we care about: a counter and a file descriptor to a file we’re currently writing to.

The basic idea of what’s going to happen is that we will create a pair of pipes and then fork(). The child process will hold the pipe that does the writing and the parent the one that does the reading. Now, the parent will exec. This is a bit odd. Normally when you fork, then exec, it’s the child process which does the exec. However, here we really want the new version of the program to have access to all of the old file descriptors. Luckily, execl preserves these. As an added benefit, the program gets the exact same process ID.

So, let’s look at the important bits of the hot-swap (reader and writer are the file descriptors for the pipe):

unsigned int outputFD = fileno(outputFile);

if(fork()) {
  /* I am the parent. */
  char readBuf[20] = {0};

  close(writer);
  sprintf(readBuf, "%d", reader);

  execl("./newbinary", "--hotswapping", readBuf, (char*)0);
  exit(0);
} else {
  /* I am the child.*/
  FILE *outputStream = fdopen(writer, "w");
  close(reader);

  fprintf(outputStream, "%d\n", i);
  fprintf(outputStream, "%u\n", (unsigned int) outputFD);
  fclose(outputStream);
  exit(0);
}

First, let’s look at what the parent process does. It simply closes its “writer” since it will never need to write to the pipe then it execs “newbinary” which is the new version of the program. It does so with a flag “–hotswapping”. This flag indicates another parameter will follow which is the file descriptor for the “read” end of the pipe we created. We do this so that the new binary can then get the state serialized across the pipe from the old binary.

Now, onto the child process. Line 32 creates a file handler from the file descriptor which is the “write” end of the pipe. Why? Because I’m lazy and I would prefer to work with fprintf() to write(). Now that we have this file handler, we can fprintf() directly to it and serialize the state we want. In this contrived case the only state I care about is my counter variable and the file descriptor of my output file. Line 19 gives me the descriptor from the handler using int fileno(FILE *).

So, to recap, we fork() then the parent exec’s to the new version of the binary and the child writes any relevant state to a pipe which the new binary is listening to.

Now, let’s look at what has to exist in the new binary. The new binary must recognize the argument “–hotswapping” which passes along the file descriptor of the “read” pipe. The following, in newversion.c does just this:

for(i = 0; i < argc; i++) {
  if(!strcmp(argv[i], "--hotswapping")) {
    int reader = atoi(argv[++i]);
    inputStream = fdopen(reader, "r");
  }
}

Notice that Line 17 does something interesting. It converts the file descriptor back to a file handler using fdopen(int, char*). Thus we can use this pipe just like we were reading from a file. So, now we can use fscanf to read from the pipe instead of having to worry about read() and buffers. This is done starting at line 21:

fscanf(inputStream, "%d", &i);
fscanf(inputStream, "%u", &outputFD);
fclose(inputStream);
outputFile = fdopen(outputFD, "w");

Once again, at line 24, we turn the file descriptor we read from the pipe back into a file handler. Now, we can resume writing to it, just as we did before. It will continue to append to the end of the file.

The files which implement this are available as gists here:

Makefile original.c newversion.c

The output when run for 11 seconds is:

gcc -Wall -pedantic -o newbinary newversion.c
gcc -Wall -pedantic -o example original.c
./example
Original:       1 My PID=27272
Original:       2 My PID=27272
Original:       3 My PID=27272
Original:       4 My PID=27272
Original:       5 My PID=27272
New Binary:     6 My PID=27272
New Binary:     7 My PID=27272
New Binary:     8 My PID=27272
New Binary:     9 My PID=27272
New Binary:     10 My PID=27272
New Binary:     11 My PID=27272