Analyzing Large Data Collections with Apache Pig


The objective of this exercise is to give you a practical experience with Pig Latin, a dataflow language for analyzing large data collections. For this purpose, you will work with a data collection comprising network tests coming from the Neubot Project.



Download the dxlab-pig repository and move it to your desktop.

Unzip the dxlab-pig/

Open the docker terminal, move to the dxlab-pig and execute the following instruction:

docker-compose pull

Neubot Data Collection

The Neubot Data Collection ( is a data collection composed of network tests (e.g., download/upload speed over HTTP) realized by different users in different locations.

A record in the data collection contains the following information:

Name Description
client_address User IP address (IPv4 or IPv6).
client_country Country where the test was conducted.
client_provider Name of user’ internet provider.
connect_time Number of seconds elapsed between the reception of the first and last package (Round-Trip Time).
download_speed Download speed (bytes/secs).
neubot_version Neubot version used for this test.
platform User operative system.
remote_address IP address (IPv4 or IPv6) of the server used for this test.
test_name Test type (ex., speedtest, bittorrent, dash).
timestamp Time at which the test was realized. Measured as the number of seconds elapsed after 1/01/1970 (cf. UNIX timestamp).
upload_speed Upload speed (bytes/secs).
latency Delay between the sent and reception of a control package.
uuid User ID (generated automatically by Neubot during installation).
asnum Internet provider’ ID.
region Country region in which the test was realized (if known).
city Name of the city.
hour Hour/Month/Year of the test (derived from timestamp).

Apache Pig Latin

Apache Pig Latin is a dataflow language, an interpreter and a compiler that produces sequences of Map-Reduce programs for analyzing large data sets in parallel infrastructures (e.g., Hadoop). Some of the benefits of using Pig Latin are:

  • Ease of programming. It is trivial to achieve parallel execution of simple, “embarrassingly parallel” data analysis tasks. Complex tasks comprised of multiple interrelated data transformations are explicitly encoded as data flow sequences, making them easy to write, understand, and maintain.
  • Optimization opportunities. The way in which tasks are encoded permits the system to optimize their execution automatically, allowing the user to focus on semantics rather than efficiency.
  • Extensibility. Users can create their own functions to do special-purpose processing.

Pig Latin Interpreter

Although Pig Latin programs are intended to be executed on a cluster, you can conduct some tests in your local machine using the pig’ interactive interpreter (GRUNT). For this, open a terminal and type the following instructions:

# Execute Pig in Local mode
docker-compose run --rm pig

Script Example

The following script illustrates how to use Pig Latin for processing the Neubot data collection. In particular, it illustrates how to:

  1. Define a schema that describes the structure of your data.
  2. Filter the data based on some criteria (i.e., keep only speedtest).
  3. Project a subset of the attributes of the data collection (eg., keep just the names of the cities where the test were conducted).
  4. Display results on the screen.
  5. Store results on the filesystem.

You can run this script by copy/pasting it in GRUNT.

> The script assumes that you are inside the folder containing the exercise material. Modify the PATHs if necessary.

REGISTER ./UDFs/NeubotTestsUDFs.jar;
DEFINE   IPtoNumber convert.IpToNumber();
DEFINE   NumberToIP convert.NumberToIp();

NeubotTests = LOAD './NeubotTests' using PigStorage(',') as (
                  client_address: chararray,
                  client_country: chararray,
                  lon: float,
                  lat: float,
                  client_provider: chararray,
                  mlabservername:  chararray,
                  connect_time:    float,
                  download_speed:  float,
                  neubot_version:  float,
                  platform:        chararray,
                  remote_address:  chararray,
                  test_name:       chararray,
                  timestamp:       long,
                  upload_speed:    float,
                  latency:  float,
                  uuid:     chararray,
                  asnum:    chararray,
                  region:   chararray,
                  city:     chararray,
                  hour:     int,
                  month:    int,
                  year:     int,
                  weekday:  int,
                  day:      int,
                  filedate: chararray

-- Keep only the 'speedtests'
--     **@** means "_use previous result_" 
Tests = FILTER @ BY (test_name matches '.*speedtest.*');

-- Cities were the tests were conducted
Cities = FOREACH @ GENERATE city;
Cities = DISTINCT @;
Cities = ORDER @ BY city;

-- Display the results contained in 'Cities'

-- Store the results in the folder 'Cities'
STORE @ INTO 'SpeedTests';


Define a dataflow for answering the following queries:

  1. Filter the speedtest conducted in Barcelona or Madrid. Then list the internet providers working in those cities.
  2. List the names and the IP ranges of the internet providers located in Barcelona. For this you need to use the IPtoNumber user defined function (cf. NeubotTestsUDFs.jar).
  3. Group the speedtest based on the user network infrastructure (e.g., 3G/4G vs ADSL). For this you can assume some max bandwidth (e.g., 21Mb/sec for ADSL).
  4. Find the user that realized the maximum number of tests. For this user, produce a table showing the evolution of her/his download/upload speeds.