Prioritized Mailbox

In my previous job (at TIM Media) we used Akka for our RTB system. Lots of actors, lots of messages and a lot of a mess (that’s what you get in RTB systems). When the system got bigger and bigger with new features and new message types, we came across a situation that we need to prioritize the messages. Well at that point creating a switch case or if-else like below, will not work nicely. Plus, we had the prioritized mailbox in a shared common package, so if we wanted to add a new type of message, we would need to update all the projects that use this common package - hell no!

public class MyPrioritizedMailbox extends UnboundedStablePriorityMailbox {
  public MyPrioritizedMailbox(ActorSystem.Settings settings, Config config) {
    super(new PriorityGenerator() {
      @Override
      public int gen(Object message) {
        if (message instanceof Message1)
          return 0;
        else if (message instanceof Message2)
          return 2;
        else if (message instanceof Message3)
          return 3;
        else
          return 1;
      }
    });
  }
}

The Solution

In our solution, we thought it will be easier just to add an interface to messages that we want to prioritize and all the rest will be with MEDIUM priority. We created an interface called PrioritizedMessage and 4 more inner interfaces for the priority types.

public interface PrioritizedMessage {
    int URGENT = 100, HIGH = 200, MEDIUM = 300, LOW = 400;
    int getPriority();

    interface Urgent extends PrioritizedMessage {
        default int getPriority() { return URGENT; }
    }

    interface High extends PrioritizedMessage {
        default int getPriority() { return HIGH; }
    }

    interface Medium extends PrioritizedMessage {
        default int getPriority() { return MEDIUM; }
    }

    interface Low extends PrioritizedMessage {
        default int getPriority() { return LOW; }
    }
}

The 100 fix diff between each priority number, is to allow flexibility between them. If we’ll want to create a new message type that is not High and not Urgent, but it’s something in between, we will just implement the getPriority method and return a number between 100 to 200, that simple.

Java 8 has introduced the concept of default methods which allow the interfaces to have methods with implementation without affecting the classes that implement the interface, a simple implement PrioritizedMessage.Urgent will work without having the compiler to scream on you to implement the getPriority method.

That’s leave us with a simple and clean UnboundedStablePriorityMailbox that will use the PrioritizedMessage interface to determine the priority.

public class PrioritizedMailbox extends UnboundedStablePriorityMailbox {
    public PrioritizedMailbox(ActorSystem.Settings settings, Config config) {
        super(new PrioritizedGenerator(
                config.hasPath("priority.poison-pill") ? config.getInt("priority.poison-pill") : PrioritizedMessage.HIGH,
                config.hasPath("priority.kill-pill") ? config.getInt("priority.kill-pill") : PrioritizedMessage.HIGH
        ));
    }

    private static class PrioritizedGenerator extends PriorityGenerator {
        private final int poisonPillPriority;
        private final int killPriority;

        private PrioritizedGenerator(int poisonPillPriority, int killPriority) {
            this.poisonPillPriority = poisonPillPriority;
            this.killPriority = killPriority;
        }

        @Override
        public int gen(Object message) {
            if (message instanceof PrioritizedMessage) {
                return ((PrioritizedMessage) message).getPriority();
            } else if (message instanceof PoisonPill) {
                return poisonPillPriority;
            } else if (message instanceof Kill) {
                return killPriority;
            } else
                return PrioritizedMessage.MEDIUM;
        }
    }
}

PoisonPill / Kill Messages

Once Prioritized Mailbox is configured, the PoisonPill and Kill message types are also been prioritized. In the example above, the default priority of those types is PrioritizedMessage.HIGH, and by passing in the configuration the priority, one can configure a different priority for those types.

Another Way - Annotation

Instead of using an interface PrioritizedMessage with 4 inner interfaces, a single annotation with a single priority method can be used.

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Prioritized {
    int URGENT = 100, HIGH = 200, MEDIUM = 300, LOW = 400;

    int priority() default MEDIUM;
}

and with a little change in the PrioritizedGenerator class:

@Override
public int gen(Object message) {
    if (message.getClass().isAnnotationPresent(Prioritized.class)) {
        return message.getClass().getAnnotation(Prioritized.class).priority();
    } else if (message instanceof PoisonPill) {
        return poisonPillPriority;
    } else if (message instanceof Kill) {
        return killPriority;
    } else
        return PrioritizedMessage.MEDIUM;
}

hell, you can use even both methods… crazy!!!

The Scala Way

Early this year (2019), I’ve started a new job, and their main language is Scala. This why I’ve decided to try to rewrite this Prioritized Mailbox the “Scala way” (hopefully).

The interface that became a trait

trait PrioritizedMessage {
  val priority: Int
}

object PrioritizedMessage {
  val URGENT = 100
  val HIGH = 200
  val MEDIUM = 300
  val LOW = 400

  trait Urgent extends PrioritizedMessage {
    override val priority = URGENT
  }

  trait High extends PrioritizedMessage {
    override val priority = HIGH
  }

  trait Medium extends PrioritizedMessage {
    override val priority = MEDIUM
  }

  trait Low extends PrioritizedMessage {
    override val priority = LOW
  }
}

The mailbox class:

class MyPrioritizedMailbox(settings: ActorSystem.Settings, config: Config)
  extends UnboundedStablePriorityMailbox({
    val DEFAULT_PRIORITY = if (config.hasPath("priority.default")) config.getInt("priority.default") else PrioritizedMessage.MEDIUM
    val POISON_PILL_PRIORITY = if (config.hasPath("priority.poison-pill")) config.getInt("priority.poison-pill") else PrioritizedMessage.HIGH
    val KILL_PRIORITY = if (config.hasPath("priority.kill-pill")) config.getInt("priority.kill-pill") else PrioritizedMessage.HIGH

    PriorityGenerator {
      case p: PrioritizedMessage => p.priority
      case PoisonPill => POISON_PILL_PRIORITY
      case Kill => KILL_PRIORITY
      case _ => DEFAULT_PRIORITY
    }
  })