Collecting planes with Elixir GenStage

Collecting planes with Elixir GenStage

Don't collect Pokemons, collect Aircrafts!

DISCLAIMER: there's already a lot of detailed information about all of the topics that I will talk about next. The following is just a summary of how I combine some features of Elixir and my passion for planes.

The following content requires some basics of Elixir and passion for planes.
Passion is on you, for Elixir I recommend the following:

Today we will cover:

  • ADB-S
  • DYI Receiver
  • dump1090
  • Elixir
  • GenStage
  • AircraftSpotter

ADS-B

Automatic dependent surveillance—broadcast (ADS–B) is a technology that allows aircrafts to constantly broadcast some information so other individuals can listen and act upon it.

Some of that information includes:

  • International Civil Aviation Organization(ICAO)
  • Position(latitude, longitude, altitude)
  • Distance from the listener
  • Speed
  • Flight number
  • Aircraft registration country
  • Transponder code (Squawk)

Websites like https://www.flightradar24.com/ or https://flightaware.com use ADS-B receivers as one of their systems to be able to track planes and give accurate information about their position.

Build your own DYI ADS-B receiver

Personally I was really surprised when I discovered how easy and cheap is to create your own receiver. There are plenty of resources out there on how to build your own receiver, here is just my quick take:

  • RaspberryPi (I'm using the Pi Zero)
  • DVB-T with a 1090MHz antenna (YES! a TV receiver!!! and YES! it's default tiny antenna!!!)
  • ADS-B decoder, dump1090 (FlightAware's fork, a.k.a. dump1090-fa)

dump1090

Dump1090 is a simple Mode S decoder for RTLSDR devices
dump1090-fa output in the terminal

While doing the research for an ADS-B decoder I was able to find the original dump1090, but also several other forks. One of the most known ones is the Mutability fork, which later was forked by FlightAware. The interesting part of FlightAware's fork is that it provides a simple JSON output that can be imported in any other project. Much better than working with the default TCP hex codes output ;)

Also, it comes with a nice Web interface:

dump1090-fa website to visualize the collected data

I have the tiny stock antenna that comes with the DVB-T dongle, depending on the weather conditions and the location of the antenna I can get around 250kms coverage.

Protip: If you are using dump1090-fa in a RaspberryPi consider writing the JSON output to a tmpfs partition, your SD Card will thank you!

Elixir

dump1090-fa by default outputs a JSON every second so there's no real time when consuming the data. However, thanks to the nature of Elixir and it's modules we can simulate a more real time environment and take advantage of the so called GenStage.

GenStage

A GenStage is an Elixir behaviour that allows us to create a pub/sub architecture with back-pressure.

In traditional architectures a publisher (called producer in GenStage) sends messages to its subscribers (called consumers in GenStage). Usually those messages are sent as soon as they arrive. This architecture can cause the subscribers to overload and eventually crash due to the amount of messages received. If that happens we might lose those messages that the subscriber received but never processed.

Traditional pub/sub architecture, no flow control

What GenStage proposes is a back-pressure architecture where the consumer is the one in control and decides when and how much to request from the producer. This means that the consumers won't be overloaded but rather the producer will have to buffer data if necessary while the consumers are working, so say good bye to consumers/workers getting overloaded with messages. For more info you can read how to configure a consumer to just request what it can take, and no more.

GenStage consumers regulating the flow and the producer using its buffer

Putting all together, Introducing AircraftSpotter

Now that we have data from dump1090 and we get the basics of GenStage is time to put everything together. Enter AircraftSpotter, an Elixir Umbrella application that consists of:

  • Fetcher: GenStage producer that consumes data from a dump1090-fa source.
  • Decoder: Helper to get extra information based on a ADS-B ICAO identifier.
  • Aircraft: Phoenix application that consumes and displays collected data.

The following image should describe how the application works.

Architecture of AircraftSpotter

Fetcher

This is our producer, in charge of fetching data from the dump1090-fa source and making it available to its consumers.

You can check the full code on Github, here is just the important bits:

# ...
def init(state) do
  schedule()

  {:producer, state, dispatcher: GenStage.BroadcastDispatcher}
end
# ...

By default a GenStage producer will take care of distributing events among its consumers but in our case we don't want that. We want all consumers to be able to consume all the events emitted. For such behaviour we need to ensure we use GenStage.BroadcastDispatcher.

If at this point you are thinking if by default is possible to create an EventSourcing architecture and replay all the history of events when new consumers join the answer is: No. If you need such capabilities you should take a look at Kafka and KafkaEx.

We continue...

As mentioned before dump1090-fa outputs data every second, so we need a mechanism to be able to poll the data.

# ...
defp schedule do
  if Application.get_env(:fetcher, :schedule) do
    Process.send_after(self(), :run, @interval_in_ms)
  end
end
# ...

Here we use Process.send_after so we can call a custom method every @interval_in_ms, in this case :run which eventually calls call:

# ...
@worker Application.get_env(:fetcher, :worker)
@parser Application.get_env(:fetcher, :parser)
# ...
def call do
  @worker.call() |> @parser.call()
end
# ...

Now is just a matter of using our @worker to fetch data and then use @parser to be able to create data that we can easily consume and manipulate.

Note that at this point there are no consumers, just our producer calling itself to retrieve data from the external source. Here lies the magic of the producer buffer. All those events will be stored in the internal buffer of the producer until a consumer starts consuming them or until the buffer size(10000 by default) is exceeded, in which case messages will be discarded. By using buffer_keep we can decide which messages to discards, new or old ones.

Decoder

There's no magic here, just a bunch of helpers ported from dump1090-fa. Feel free to check the code if you would like to know how an ICAO identifier like 3C5EE7 tells you that this specific aircraft was registered in Germany and is an Airbus A319.

If you require further info about an aircraft you can check the https://opensky-network.org/.

Aircraft

The Phoenix application does not have too much secret, so instead let's focus on the two consumers.

Importer

This module is in charge of getting an event from the producer and insert or update records in the database. Let's take a look at the most important parts of it.

# ...
@worker_type Aircraft.Consumer.Importer
@producer_type Fetcher.Producer
# ...
use ConsumerSupervisor
# ...

def init(_arg) do
  children = [
    %{
      id: @worker_type,
      restart: :transient,
      start: {@worker_type, :start_link, []}
    }
  ]

  opts = [
    strategy: :one_for_one,
    subscribe_to: [{@producer_type, max_demand: 50}]
  ]

  ConsumerSupervisor.init(children, opts)
end
# ...

First of all we want to use ConsumerSupervisor which allows us to spawn an Erlang process for each event received. Concurrency FTW!

Then we just need to configure where we want to subscribe_to and which module we want to execute with @worker_type.

There are more options there like :transient or :one_for_one, feel free to reach out to the Supervisor docs to get more insights, but in a nutshell: it tells the Supervisor to restart a worker if it didn't have a normal signal when finishing the process, and in the case it restarts it only restart the one that crashed thus not affecting the rest of the workers.

Our worker looks like:

# ...
defmodule Aircraft.Consumer.Importer do
  @moduledoc false

  alias Aircraft.Aircraft.Importer

  def start_link(event) do
    Task.start_link(fn ->
      Importer.handle_aircraft(event)
    end)
  end
end
# ...

For each event received it will start an async Task executing Importer.handle_aircraft(event).

Dumper

This is the most simple module we have. It just gets an event, adds a timestamp and inserts it into the database. The intention of this module is to just create your own EventSource for future analysis.

By design we don't really need concurrency as we want the events to be inserted in the database in the order they were received. So the implementation ends up really simple:

defmodule Aircraft.Consumer.Dumper do
  @moduledoc false

  use GenStage

  alias Aircraft.Aircraft.Dumper

  def start_link(_) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:consumer, :the_state_does_not_matter, subscribe_to: [Fetcher.Producer]}
  end

  def handle_events(events, _from, state) do
    Enum.each(events, &Dumper.call/1)
    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end
end

Here we just subscribe_to the producer and execute our module call function for every received event.

Using this architecture with GenStage allows us to easily plug in new consumers and start working with the data without affecting the rest of the system.

Final thoughts

So far the AircraftSpotter is just a simple website that displays the aircrafts seen today and also lists the ones seen all times by their registration country.

I truly believe in open sourcing your pet projects, that's why you can find all the code for AircraftSpotter in Github.

Start collecting planes today!

Want to reach out? Feel free to use any of my social channels.