Last post I talked about building a high throughput, highly available analytics backend. I’m not going to do a thorough tutorial here, just identify some steps in getting set up and a few things I learned.

Getting set up

I installed RabbitMQ through brew, and Phoenix / Elixir from the Phoenix homepage. Starting the RabbitMQ server was as simple as

/usr/local/sbin/rabbitmq

and getting a Phoenix app going was easy too (I called my app frequency; it’s an Artsy tradition to name applications after physics terms):

mix phoenix.new frequency --no-brunch --no-ecto --no-html

We just want a pure phoenix API, so we’ll leave out brunch (the build tool), ecto (the ActiveRecord equivalent), and all of the html support and templating we would need if we wanted our app to have a front-end.

The mix.exs file defines the application’s dependencies as follows (also add these to your applications method):

  defp deps do
    [{:phoenix, "~> 1.2.1"},
     {:phoenix_pubsub, "~> 1.0"},
     {:gettext, "~> 0.11"},
     {:cowboy, "~> 1.0"},
     {:amqp, "~> 0.2.0-pre.1"}, # https://github.com/pma/amqp/issues/28
     {:briefly, "~> 0.3"},
     {:ex_aws, "~> 1.0"},
     {:hackney, "~> 1.6"}
    ]
  end

Receiving calls and publishing messages

Under web/router.ex we add a single POST route:

  scope "/api", Frequency do
    pipe_through :api

    post "/t", TracksController, :index
  end

In that route we reference the TracksController which doesn’t exist yet, so under web/controllers let’s create tracks_controller.ex with the following body:

defmodule Frequency.TracksController do
  use Frequency.Web, :controller

  def index(conn, params) do
    {:ok, message} = Poison.encode(params)
    Frequency.Worker.publish(message)
    conn
     |> text("200")
  end
end

And you’ll see that in turn defers to a Frequency.Worker that we’ll have to make. In lib we’ll make worker.ex which looks like

defmodule Frequency.Worker do
  use GenServer

  ## Client API

  def start_link do
    GenServer.start_link(__MODULE__, :ok, name: :publisher)
  end

  def publish(message) do
    IO.puts "handling cast.. "
    GenServer.cast(:publisher, {:publish, message})
  end

  ## Server Callbacks

  def init(:ok) do
    {:ok, connection} = AMQP.Connection.open
    {:ok, channel} = AMQP.Channel.open(connection)
    AMQP.Queue.declare(channel, "tracks")
    {:ok, %{channel: channel, connection: connection} }
  end

  def handle_cast({:publish, message}, state) do
    AMQP.Basic.publish(state.channel, "", "tracks", message)
    {:noreply, state}
  end

  def terminate(_reason, state) do
    AMQP.Connection.close(state.connection)
  end
end

This worker publishes all messages to a RabbitMQ channel: It defines a single GenServer with the name publisher which we’ll set up to start under the same supervisor as our Frequency application (we’ll do this in a minute). The GenServer exposes a single method, :publish, which drops the message into a channel defined by the :init method. Finally, in lib/frequency.ex, update the children of our process to include our new worker.

children = [
      # Start the endpoint when the application starts
      supervisor(Frequency.Endpoint, []),
      worker(Frequency.Worker, []),
    ]

Halfway there.

Receiving messages from RabbitMQ and posting them to S3

Under lib, we’ll create a receiver.ex which reads messages off the RabbitMQ channel, adds them to a list, and then every 1,000 messages will encode those messages as a JSON file and upload them to S3 using ExAWS (you’ll need to add the variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to your environment).

alias ExAws.S3

defmodule Receiver do 
  def wait_for_messages do
    channel_name = "tracks"
    {:ok, connection} = AMQP.Connection.open
    {:ok, channel} = AMQP.Channel.open(connection)
    AMQP.Queue.declare(channel, channel_name)
    AMQP.Basic.consume(channel, channel_name, nil, no_ack: true)
    Agent.start_link(fn -> [] end, name: :batcher)
    _wait_for_messages()
  end

  defp push(value) do
    Agent.update(:batcher, fn list -> [value|list] end)
    flush_if_full()
  end

  defp flush do
    Agent.update(:batcher, fn _ -> [] end)
  end

  defp full? do
    Agent.get(:batcher, fn list -> length(list) > 1000 end)
  end

  defp make_key do
    rand = :crypto.strong_rand_bytes(6) |> Base.url_encode64
    now = DateTime.utc_now |> DateTime.to_string
    "batch_#{now}_#{rand}.json"
  end

  defp write_and_upload(path, json) do
    File.write!(path, json)
    S3.put_object("<your-bucket>", "frequency/#{make_key()}", File.read!(path)) |> ExAws.request
  end

  defp flush_if_full do
    if full?() do
      l = Agent.get(:batcher, fn list -> list end)
      {:ok, path} = Briefly.create
      {:ok, json} = Poison.encode(l)
      write_and_upload(path, json)
      flush()
    end
  end

  defp _wait_for_messages do
    receive do
      {:basic_deliver, payload, _meta} ->
        push(payload)
        IO.puts "received a message!"
        _wait_for_messages()
    end
  end
end

Finally, we can string it all together with mix phoenix.server in one terminal window, and iex -S mix in another, and in the iex pane run

Receiver.wait_for_messages

And all that’s left is hammering our API with POST requests, which I elected to do in Ruby:

require 'net/http'
uri = URI('127.0.0.1:4000/api/t')
30.times do
  1000.times do 
    Thread.new {Net::HTTP.post_form(uri, 'event' => 'sent_a_message', 'user_id' => 'xyz') }
  end
  sleep(.5) # ruby can only spawn so many threads
end

Sit back and watch your API soak up thousands of concurrent requests without a sweat.