2010-11-30

Java Continuations and Green Threads at native speeds

Continuations, or green threads, are a concept mostly seen in scripting languages. Green threads are just like native threads, but you can run multiple of them on a single native thread. Java has no native support for continuations, so we have to transform (instrument) java bytecode with a library that saves localvars and the instruction pointer upon a GreenThread.yield(). Once the green thread is resumed, this information is used to rebuild the stacktrace, with all localvars intact.

The library used is: http://l33tlabs.org/Continuations/ written by Matthias Mann
His library depends on ASM3 to parse and construct the bytecode: http://asm.ow2.org/

Through the Java Instrumentation Agent that I wrote for the previously mentioned library, there is no need to transform your byteclass ahead of time. The java agent will take care of this at runtime.

java -javaagent:conti4.jar -cp ./your.jars your.MainClass


I wrote a wrapper around the continuation library, to lower the bar a bit. I hope the code pretty much speaks for itself, and if not, feel free to ask questions. For additional convenience, I added some demo code and its fascinating output at the bottom.

Enjoy!
import de.matthiasmann.continuations.Coroutine;
import de.matthiasmann.continuations.CoroutineProto;
import de.matthiasmann.continuations.SuspendExecution;
import de.matthiasmann.continuations.Coroutine.State;

/*
 * Created on Nov 30, 2010
 * 
 * @author Riven
 */

public abstract class GreenThread implements CoroutineProto
{
   public static final long EOF = -1L;

   private final Coroutine  coroutine;

   public GreenThread()
   {
      this.coroutine = new Coroutine(this);
   }

   private long doSleep;

   @Override
   // this is where you put your code
   public abstract void coExecute() throws SuspendExecution;

   long step()
   {
      coroutine.run();
      if (coroutine.getState() == State.FINISHED)
         return EOF;
      return this.doSleep;
   }

   public static void yield() throws SuspendExecution
   {
      GreenThread.sleep(0L);
   }

   public static void sleep(long ms) throws SuspendExecution
   {
      GreenThread thread = (GreenThread) Coroutine.getActiveCoroutine().getProto();

      thread.doSleep = Math.max(ms, 0L);

      Coroutine.yield();
   }
}



---------------------------------------------------------------------

import java.util.Comparator;
import java.util.PriorityQueue;

/*
 * Created on Nov 30, 2010
 * 
 * @author Riven
 */

public class GreenThreadQueue
{
   private final PriorityQueue queue;
   private final List          reschedule;

   public GreenThreadQueue()
   {
      this.queue = new PriorityQueue(16, new GreenThreadWakeupComparator());
      this.reschedule = new ArrayList();
   }

   public void start(GreenThread thread)
   {
      this.queue.add(new GreenThreadWakeup(thread, 0L));
   }

   public boolean tick(long now)
   {
      try
      {
         while (true)
         {
            GreenThreadWakeup wakeup = this.queue.peek();
            if (wakeup == null)
               return !this.reschedule.isEmpty(); // signal nothing more to do

            if (wakeup.timestamp > now)
               break;

            if (this.queue.poll() != wakeup)
               throw new IllegalStateException();

            long sleep = wakeup.thread.step();
            if (sleep == GreenThread.EOF)
               continue;

            wakeup.timestamp = now + sleep;
            this.reschedule.add(wakeup);
         }

         return true;
      }
      finally
      {
         for (GreenThreadWakeup wakeup : this.reschedule)
         {
            this.queue.add(wakeup);
         }
         this.reschedule.clear();
      }
   }

   static private class GreenThreadWakeup
   {
      public final GreenThread thread;
      public long              timestamp;

      public GreenThreadWakeup(GreenThread thread, long timestamp)
      {
         this.thread = thread;
         this.timestamp = timestamp;
      }
   }

   static class GreenThreadWakeupComparator implements Comparator
   {
      @Override
      public int compare(GreenThreadWakeup o1, GreenThreadWakeup o2)
      {
         int val = Long.signum(o1.timestamp - o2.timestamp);
         return (val == 0) ? 1 : val;
      }
   }
}



Demo code
GreenThreadQueue queue = new GreenThreadQueue();

      GreenThread thread1 = new GreenThread()
      {
         @Override
         public void coExecute() throws SuspendExecution
         {
            for (int i = 0; i < 3; i++)
            {
               System.out.println("a" + i);
               GreenThread.sleep(1500);
            }
         }
      };

      GreenThread thread2 = new GreenThread()
      {
         @Override
         public void coExecute() throws SuspendExecution
         {
            for (int i = 0; i < 4; i++)
            {
               System.out.println("b" + i);
               GreenThread.sleep(1300);
            }
         }
      };

      queue.start(thread1);
      queue.start(thread2);

      do
      {
         try { Thread.sleep(100); } catch(InterruptedException exc) { /* ignore */ }
      }
      while (queue.tick(System.currentTimeMillis()));
   }
Fancy output (from 1 thread)
a0
b0
b1
a1
b2
a2
b3

1 comment:

  1. I want to regiester to the forum java-gaming.org but never gets the activation e-mail, can you help me with that? I cannot get help from the forum so maybe I can get help in this blog

    ReplyDelete