Collecting planes with Elixir GenStage
Don't collect Pokemons, collect Aircrafts!
DISCLAIMER: there's already a lot of detailed information about all of the topics that I will talk about next. The following is just a summary of how I combine some features of Elixir and my passion for planes.
The following content requires some basics of Elixir and passion for planes.
Passion is on you, for Elixir I recommend the following:
Today we will cover:
- ADB-S
- DYI Receiver
- dump1090
- Elixir
- GenStage
- AircraftSpotter
ADS-B
Automatic dependent surveillance—broadcast (ADS–B) is a technology that allows aircrafts to constantly broadcast some information so other individuals can listen and act upon it.
Some of that information includes:
- International Civil Aviation Organization(ICAO)
- Position(latitude, longitude, altitude)
- Distance from the listener
- Speed
- Flight number
- Aircraft registration country
- Transponder code (Squawk)
Websites like https://www.flightradar24.com/ or https://flightaware.com use ADS-B receivers as one of their systems to be able to track planes and give accurate information about their position.
Build your own DYI ADS-B receiver
Personally I was really surprised when I discovered how easy and cheap is to create your own receiver. There are plenty of resources out there on how to build your own receiver, here is just my quick take:
- RaspberryPi (I'm using the Pi Zero)
- DVB-T with a 1090MHz antenna (YES! a TV receiver!!! and YES! it's default tiny antenna!!!)
- ADS-B decoder, dump1090 (FlightAware's fork, a.k.a. dump1090-fa)
dump1090
Dump1090 is a simple Mode S decoder for RTLSDR devices
While doing the research for an ADS-B decoder I was able to find the original dump1090, but also several other forks. One of the most known ones is the Mutability fork, which later was forked by FlightAware. The interesting part of FlightAware's fork is that it provides a simple JSON output that can be imported in any other project. Much better than working with the default TCP hex codes output ;)
Also, it comes with a nice Web interface:
I have the tiny stock antenna that comes with the DVB-T dongle, depending on the weather conditions and the location of the antenna I can get around 250kms coverage.
Protip: If you are usingdump1090-fa
in a RaspberryPi consider writing the JSON output to atmpfs
partition, your SD Card will thank you!
Elixir
dump1090-fa by default outputs a JSON every second so there's no real time when consuming the data. However, thanks to the nature of Elixir and it's modules we can simulate a more real time environment and take advantage of the so called GenStage.
GenStage
A GenStage is an Elixir behaviour that allows us to create a pub/sub architecture with back-pressure.
In traditional architectures a publisher (called producer in GenStage) sends messages to its subscribers (called consumers in GenStage). Usually those messages are sent as soon as they arrive. This architecture can cause the subscribers to overload and eventually crash due to the amount of messages received. If that happens we might lose those messages that the subscriber received but never processed.
What GenStage proposes is a back-pressure architecture where the consumer is the one in control and decides when and how much to request from the producer. This means that the consumers won't be overloaded but rather the producer will have to buffer data if necessary while the consumers are working, so say good bye to consumers/workers getting overloaded with messages. For more info you can read how to configure a consumer to just request what it can take, and no more.
Putting all together, Introducing AircraftSpotter
Now that we have data from dump1090
and we get the basics of GenStage is time to put everything together. Enter AircraftSpotter, an Elixir Umbrella application that consists of:
- Fetcher: GenStage producer that consumes data from a
dump1090-fa
source. - Decoder: Helper to get extra information based on a ADS-B ICAO identifier.
- Aircraft: Phoenix application that consumes and displays collected data.
The following image should describe how the application works.
Fetcher
This is our producer, in charge of fetching data from the dump1090-fa
source and making it available to its consumers.
You can check the full code on Github, here is just the important bits:
# ...
def init(state) do
schedule()
{:producer, state, dispatcher: GenStage.BroadcastDispatcher}
end
# ...
By default a GenStage producer will take care of distributing events among its consumers but in our case we don't want that. We want all consumers to be able to consume all the events emitted. For such behaviour we need to ensure we use GenStage.BroadcastDispatcher
.
If at this point you are thinking if by default is possible to create an EventSourcing architecture and replay all the history of events when new consumers join the answer is: No. If you need such capabilities you should take a look at Kafka and KafkaEx.
We continue...
As mentioned before dump1090-fa
outputs data every second, so we need a mechanism to be able to poll the data.
# ...
defp schedule do
if Application.get_env(:fetcher, :schedule) do
Process.send_after(self(), :run, @interval_in_ms)
end
end
# ...
Here we use Process.send_after
so we can call a custom method every @interval_in_ms
, in this case :run
which eventually calls call
:
# ...
@worker Application.get_env(:fetcher, :worker)
@parser Application.get_env(:fetcher, :parser)
# ...
def call do
@worker.call() |> @parser.call()
end
# ...
Now is just a matter of using our @worker
to fetch data and then use @parser
to be able to create data that we can easily consume and manipulate.
Note that at this point there are no consumers, just our producer calling itself to retrieve data from the external source. Here lies the magic of the producer buffer. All those events will be stored in the internal buffer of the producer until a consumer starts consuming them or until the buffer size(10000 by default) is exceeded, in which case messages will be discarded. By using buffer_keep
we can decide which messages to discards, new or old ones.
Decoder
There's no magic here, just a bunch of helpers ported from dump1090-fa
. Feel free to check the code if you would like to know how an ICAO identifier like 3C5EE7
tells you that this specific aircraft was registered in Germany and is an Airbus A319.
If you require further info about an aircraft you can check the https://opensky-network.org/.
Aircraft
The Phoenix application does not have too much secret, so instead let's focus on the two consumers.
Importer
This module is in charge of getting an event from the producer and insert or update records in the database. Let's take a look at the most important parts of it.
# ...
@worker_type Aircraft.Consumer.Importer
@producer_type Fetcher.Producer
# ...
use ConsumerSupervisor
# ...
def init(_arg) do
children = [
%{
id: @worker_type,
restart: :transient,
start: {@worker_type, :start_link, []}
}
]
opts = [
strategy: :one_for_one,
subscribe_to: [{@producer_type, max_demand: 50}]
]
ConsumerSupervisor.init(children, opts)
end
# ...
First of all we want to use ConsumerSupervisor
which allows us to spawn an Erlang process for each event received. Concurrency FTW!
Then we just need to configure where we want to subscribe_to
and which module we want to execute with @worker_type
.
There are more options there like :transient
or :one_for_one
, feel free to reach out to the Supervisor
docs to get more insights, but in a nutshell: it tells the Supervisor
to restart a worker if it didn't have a normal signal when finishing the process, and in the case it restarts it only restart the one that crashed thus not affecting the rest of the workers.
Our worker looks like:
# ...
defmodule Aircraft.Consumer.Importer do
@moduledoc false
alias Aircraft.Aircraft.Importer
def start_link(event) do
Task.start_link(fn ->
Importer.handle_aircraft(event)
end)
end
end
# ...
For each event received it will start an async Task executing Importer.handle_aircraft(event)
.
Dumper
This is the most simple module we have. It just gets an event, adds a timestamp and inserts it into the database. The intention of this module is to just create your own EventSource for future analysis.
By design we don't really need concurrency as we want the events to be inserted in the database in the order they were received. So the implementation ends up really simple:
defmodule Aircraft.Consumer.Dumper do
@moduledoc false
use GenStage
alias Aircraft.Aircraft.Dumper
def start_link(_) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [Fetcher.Producer]}
end
def handle_events(events, _from, state) do
Enum.each(events, &Dumper.call/1)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
Here we just subscribe_to
the producer and execute our module call
function for every received event.
Using this architecture with GenStage allows us to easily plug in new consumers and start working with the data without affecting the rest of the system.
Final thoughts
So far the AircraftSpotter is just a simple website that displays the aircrafts seen today and also lists the ones seen all times by their registration country.
I truly believe in open sourcing your pet projects, that's why you can find all the code for AircraftSpotter in Github.
Start collecting planes today!
Want to reach out? Feel free to use any of my social channels.