This post describes a pattern for implementing code with side effects in a testable way in Elixir. It builds on Mocks and explicit contracts, so you should read that first if you haven’t.
Testing a Twitter Client
In José’s example, a Phoenix application’s interactions with the Twitter API are tested. The way this is done without actually calling out to Twitter is by replacing the API component during tests with a mock, which is basically a module that implements the same contract as the real Twitter module.
The example mock in the blog post looks like this:
defmodule MyApp.Twitter.InMemory do
def get_username("josevalim") do
%MyApp.Twitter.User{
username: "josevalim"
}
end
endHow to test that a tweet is posted?
This approach works great if you are testing the effect an API call has on your application. If, for example, you have some action that fetches the user’s Twitter username and stores it in the database, you could do the following:
test "fetches and stores the username from twitter", %{conn: conn, user: user} do
post(conn, twitter_path(conn, :fetch_twitter_username))
assert Repo.get(User, user.id).username == "josevalim"
endIt gets a little more difficult when you want to test that an API is called that doesn’t have a direct effect on your application. How could you test that a tweet is posted, for example?
send(self(), tweet)
One possibility is to send a message from the mock to the current process:
defmodule MyApp.Twitter.InMemory do
...
def post_tweet(tweet_text) do
send(self(), {:tweet_posted, tweet_text})
end
endThen in a test you could check that a matching message is received.
test "posts a tweet", %{conn: conn} do
posted_text = "Hi, this is mock."
post(conn, twitter_path(conn, :share_text), %{text: posted_text})
assert_receive({:tweet_posted, ^posted_text})
endWhat if the tweet is sent in a different process?
Very often this is enough.
When you post the tweet in a different process however (Task.async, GenServer, …),
the {:tweet_posted, ...} message will be sent to that process instead and your test will fail.
This is when you could use a GenServer to distribute messages. Interested processes can subscribe and will then be notified about every posted tweet. First, let me show you the complete code for the mock GenServer:
defmodule MyApp.Twitter.InMemory do
use GenServer
# Client
def start_link do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def subscribe do
GenServer.call(__MODULE__, {:subscribe, self()})
end
def post_tweet(tweet_text) do
GenServer.call(__MODULE__, {:post_tweet, tweet_text})
end
# Server
def handle_call({:subscribe, pid}, _from, listeners) do
{:reply, :ok, [pid | listeners]}
end
def handle_call({:post_tweet, tweet_text}, _from, listeners) do
send_to_listeners(listeners, {:tweet_posted, tweet_text})
{:reply, :ok, listeners}
end
defp send_to_listeners(listeners, message) do
for listener <- listeners do
send(listener, message)
end
end
endLet me walk you through what all these functions do.
def start_link do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
endHere the GenServer is started and initialized with an empty list as state. This list will later hold the subscribed processes.
def subscribe do
GenServer.call(__MODULE__, {:subscribe, self()})
end
...
def handle_call({:subscribe, pid}, _from, listeners) do
{:reply, :ok, [pid | listeners]}
endThis is how processes subscribe.
We get their pid (self()) and add it to the internal list of listeners.
def post_tweet(tweet_text) do
GenServer.call(__MODULE__, {:post_tweet, tweet_text})
end
def handle_call({:post_tweet, tweet_text}, _from, listeners) do
send_to_listeners(listeners, {:tweet_posted, tweet_text})
{:reply, :ok, listeners}
end
defp send_to_listeners(listeners, message) do
for listener <- listeners do
send(listener, message)
end
endAnd finally, this is how tweets are posted. Just as in the version before,
we create a tuple of {:tweet_posted, tweet_text}. But now, instead of
sending it to the current process, we send it to all the subscribed listeners.
Usage
You still need to start the mock-Genserver and subscribe to it from every test that makes use of it. You can either do that per test:
test "posts a tweet", %{conn: conn} do
MyApp.Twitter.InMemory.start_link()
MyApp.Twitter.InMemory.subscribe()
posted_text = "Hi, this is mock."
post(conn, twitter_path(conn, :share_text), %{text: posted_text})
assert_receive({:tweet_posted, ^posted_text})
endOr in the setup function of the test case or even the case template:
defmodule MyApp.ConnCase do
...
setup tags do
...
MyApp.Twitter.InMemory.start_link()
MyApp.Twitter.InMemory.subscribe()
{:ok, conn: Phoenix.ConnTest.build_conn()}
end
endCaveat
When you run your tests asynchronously, this approach will not work unless some extra steps are taken. In all my current projects, tests are run synchronously, so I can’t really provide good guidelines here. But there are at least two changes neccessary:
- Not stopping the mock after every test. You could just call
MyApp.Twitter.InMemory.start_link()in yourtest_helper.exsinstead. - Making sure you have some unique way of identifying messages per test. In our tweet example, this could be as simple as ensuring all tweeted texts in your tests are unique. You could then match on the text an only consume matching messages.
PryIn