Phoenix and RabbitMQ
Last post I talked about building a high throughput, highly available analytics backend. I’m not going to do a thorough tutorial here, just identify some steps in getting set up and a few things I learned.
Getting set up
I installed RabbitMQ through brew, and Phoenix / Elixir from the Phoenix homepage. Starting the RabbitMQ server was as simple as
/usr/local/sbin/rabbitmq and getting a Phoenix app going was easy too (I called my app frequency; it’s an Artsy tradition to name applications after physics terms):
mix phoenix.new frequency --no-brunch --no-ecto --no-html We just want a pure phoenix API, so we’ll leave out brunch (the build tool), ecto (the ActiveRecord equivalent), and all of the html support and templating we would need if we wanted our app to have a front-end.
The mix.exs file defines the application’s dependencies as follows (also add these to your applications method):
defp deps do
[{:phoenix, "~> 1.2.1"},
{:phoenix_pubsub, "~> 1.0"},
{:gettext, "~> 0.11"},
{:cowboy, "~> 1.0"},
{:amqp, "~> 0.2.0-pre.1"}, # https://github.com/pma/amqp/issues/28
{:briefly, "~> 0.3"},
{:ex_aws, "~> 1.0"},
{:hackney, "~> 1.6"}
]
end Receiving calls and publishing messages
Under web/router.ex we add a single POST route:
scope "/api", Frequency do
pipe_through :api
post "/t", TracksController, :index
end In that route we reference the TracksController which doesn’t exist yet, so under web/controllers let’s create tracks_controller.ex with the following body:
defmodule Frequency.TracksController do
use Frequency.Web, :controller
def index(conn, params) do
{:ok, message} = Poison.encode(params)
Frequency.Worker.publish(message)
conn
|> text("200")
end
end And you’ll see that in turn defers to a Frequency.Worker that we’ll have to make. In lib we’ll make worker.ex which looks like
defmodule Frequency.Worker do
use GenServer
## Client API
def start_link do
GenServer.start_link(__MODULE__, :ok, name: :publisher)
end
def publish(message) do
IO.puts "handling cast.. "
GenServer.cast(:publisher, {:publish, message})
end
## Server Callbacks
def init(:ok) do
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "tracks")
{:ok, %{channel: channel, connection: connection} }
end
def handle_cast({:publish, message}, state) do
AMQP.Basic.publish(state.channel, "", "tracks", message)
{:noreply, state}
end
def terminate(_reason, state) do
AMQP.Connection.close(state.connection)
end
end This worker publishes all messages to a RabbitMQ channel: It defines a single GenServer with the name publisher which we’ll set up to start under the same supervisor as our Frequency application (we’ll do this in a minute). The GenServer exposes a single method, :publish, which drops the message into a channel defined by the :init method. Finally, in lib/frequency.ex, update the children of our process to include our new worker.
children = [
# Start the endpoint when the application starts
supervisor(Frequency.Endpoint, []),
worker(Frequency.Worker, []),
] Halfway there.
Receiving messages from RabbitMQ and posting them to S3
Under lib, we’ll create a receiver.ex which reads messages off the RabbitMQ channel, adds them to a list, and then every 1,000 messages will encode those messages as a JSON file and upload them to S3 using ExAWS (you’ll need to add the variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to your environment).
alias ExAws.S3
defmodule Receiver do
def wait_for_messages do
channel_name = "tracks"
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, channel_name)
AMQP.Basic.consume(channel, channel_name, nil, no_ack: true)
Agent.start_link(fn -> [] end, name: :batcher)
_wait_for_messages()
end
defp push(value) do
Agent.update(:batcher, fn list -> [value|list] end)
flush_if_full()
end
defp flush do
Agent.update(:batcher, fn _ -> [] end)
end
defp full? do
Agent.get(:batcher, fn list -> length(list) > 1000 end)
end
defp make_key do
rand = :crypto.strong_rand_bytes(6) |> Base.url_encode64
now = DateTime.utc_now |> DateTime.to_string
"batch_#{now}_#{rand}.json"
end
defp write_and_upload(path, json) do
File.write!(path, json)
S3.put_object("<your-bucket>", "frequency/#{make_key()}", File.read!(path)) |> ExAws.request
end
defp flush_if_full do
if full?() do
l = Agent.get(:batcher, fn list -> list end)
{:ok, path} = Briefly.create
{:ok, json} = Poison.encode(l)
write_and_upload(path, json)
flush()
end
end
defp _wait_for_messages do
receive do
{:basic_deliver, payload, _meta} ->
push(payload)
IO.puts "received a message!"
_wait_for_messages()
end
end
end Finally, we can string it all together with mix phoenix.server in one terminal window, and iex -S mix in another, and in the iex pane run
Receiver.wait_for_messages And all that’s left is hammering our API with POST requests, which I elected to do in Ruby:
require 'net/http'
uri = URI('127.0.0.1:4000/api/t')
30.times do
1000.times do
Thread.new {Net::HTTP.post_form(uri, 'event' => 'sent_a_message', 'user_id' => 'xyz') }
end
sleep(.5) # ruby can only spawn so many threads
end Sit back and watch your API soak up thousands of concurrent requests without a sweat.