Example: Dumps of Twitch data
This example is based on data capturing statistics and properties of popular Twitch channels. The setup is such that we have two data sets ‘of the same kind’ but from different points in time.
In other words, a ‘version’ of the data set represents a temporal notion. For example, version 1 might stem from end of March and version 2 from end of April. Moreover, we will assume that the first, version 1, has been vetted and approved with the help of manual investigation and domain knowledge. The second data set, version 2, has just been made available. We would like to use it but can’t be sure of its validity just yet. As a consequence we would like to assess the quality of the data in version 2.
In order to have a database Postgres instance to begin with, it might be useful to use our script, spinning up a dockerized Postgres database:
$ ./start_postgres.sh
The original data set can be found on kaggle. For the sake of this tutorial, we slightly process it and provide two versions of it. One can either recreate this by executing this processing script oneself on the original data or download our processed files ( version 1 and version 2) right away.
Once both version of the data exist, they can be uploaded to the tabase. We provide an uploading script creating and populating one table per version of the data in a Postgres database. It resembles the following:
address = os.environ.get("DB_ADDR", "localhost")
connection_string = f"postgresql://datajudge:datajudge@{address}:5432/datajudge"
engine = sa.create_engine(connection_string)
df_v2.to_sql("twitch_v2", engine, schema="public", if_exists="replace")
df_v1.to_sql("twitch_v1", engine, schema="public", if_exists="replace")
Once the tables are stored in a database, we can actually write a datajudge
specification against them. But first, we’ll have a look at what the data roughly
looks like by investigating a random sample of four rows:
channel |
watch_time |
stream_time |
peak_viewers |
average_viewers |
followers |
followers_gained |
views_gained |
partnered |
mature |
language |
---|---|---|---|---|---|---|---|---|---|---|
xQcOW |
6196161750 |
215250 |
222720 |
27716 |
3246298 |
1734810 |
93036735 |
True |
False |
English |
summit1g |
6091677300 |
211845 |
310998 |
25610 |
5310163 |
1374810 |
89705964 |
True |
False |
English |
Gaules |
5644590915 |
515280 |
387315 |
10976 |
1767635 |
1023779 |
102611607 |
True |
True |
Portuguese |
ESL_CSGO |
3970318140 |
517740 |
300575 |
7714 |
3944850 |
703986 |
106546942 |
True |
False |
English |
Note that we expect both version 1 and version 2 to follow this structure. Due to them being assembled at different points in time, merely their rows shows differ.
Now let’s write an actual specification, expressing our expectations against the data. First, we need to make sure a connection to the database can be established at test execution time. How this is done exactly depends on how you set up your database. When using our default setup with running, this would look as follows:
import os
import pytest
import sqlalchemy as sa
@pytest.fixture(scope="module")
def datajudge_engine():
address = os.environ.get("DB_ADDR", "localhost")
connection_string = f"postgresql://datajudge:datajudge@{address}:5432/datajudge"
return sa.create_engine(connection_string)
Once a way to connect to the database is defined, we want to declare our data sources and express expectations against them. In this example, we have two tables in the same database - one table per version of the Twitch data.
Yet, let’s start with a straightforward example only using version 2. We want to use our
domain knowledge that constrains the values of the language
column only to contain letters
and have a length strictly larger than 0.
from datajudge import WithinRequirement
# Postgres' default database.
db_name = "tempdb"
# Postgres' default schema.
schema_name = "public"
within_requirement = WithinRequirement.from_table(
table_name="twitch_v2",
schema_name=schema_name,
db_name=db_name,
)
within_requirement.add_varchar_regex_constraint(
column="language",
regex="^[a-zA-Z]+$",
)
Done! Now onto comparisons between the table representing the approved version 1 of the data and the to be assessed version 2 of the data.
from datajudge import BetweenRequirement, Condition
between_requirement_version = BetweenRequirement.from_tables(
db_name1=db_name,
db_name2=db_name,
schema_name1=schema_name,
schema_name2=schema_name,
table_name1="twitch_v1",
table_name2="twitch_v2",
)
between_requirement_version.add_column_subset_constraint()
between_requirement_version.add_column_superset_constraint()
columns = ["channel", "partnered", "mature"]
between_requirement_version.add_row_subset_constraint(
columns1=columns, columns2=columns, constant_max_missing_fraction=0
)
between_requirement_version.add_row_matching_equality_constraint(
matching_columns1=["channel"],
matching_columns2=["channel"],
comparison_columns1=["language"],
comparison_columns2=["language"],
max_missing_fraction=0,
)
between_requirement_version.add_ks_2sample_constraint(
column1="average_viewers",
column2="average_viewers",
significance_level=0.05,
)
between_requirement_version.add_uniques_equality_constraint(
columns1=["language"],
columns2=["language"],
)
Now having compared the ‘same kind of data’ between version 1 and version 2,
we may as well compare ‘different kind of data’ within version 2, as a means of
a sanity check. This sanity check consists of checking whether the mean
average_viewer
value of mature channels should deviate at most 10% from
the overall mean.
between_requirement_columns = BetweenRequirement.from_tables(
db_name1=db_name,
db_name2=db_name,
schema_name1=schema_name,
schema_name2=schema_name,
table_name1="twitch_v2",
table_name2="twitch_v2",
)
between_requirement_columns.add_numeric_mean_constraint(
column1="average_viewers",
column2="average_viewers",
condition1=None,
condition2=Condition(raw_string="mature IS TRUE"),
max_absolute_deviation=0.1,
)
Lastly, we need to collect all of our requirements in a list and make sure
pytest
can find them by calling collect_data_tests
.
from datajudge.pytest_integration import collect_data_tests
requirements = [
within_requirement,
between_requirement_version,
between_requirement_columns,
]
test_func = collect_data_tests(requirements)
If we then test these expectations against the data by running
$ pytest specification.py
– where specification.py
contains all of the code outlined before (you can find it
here )
– we see that the new version of the data is
not quite on par with what we’d expect:
$ pytest twitch_specification.py
================================== test session starts ===================================
platform darwin -- Python 3.10.5, pytest-7.1.2, pluggy-1.0.0
rootdir: /Users/kevin/Code/datajudge/docs/source/examples
plugins: html-3.1.1, cov-3.0.0, metadata-2.0.2
collected 8 items
twitch_specification.py F.....FF [100%]
======================================== FAILURES ========================================
____________________ test_func[VarCharRegex::tempdb.public.twitch_v2] ____________________
constraint = <datajudge.constraints.varchar.VarCharRegex object at 0x10855da20>
datajudge_engine = Engine(postgresql://datajudge:***@localhost:5432/datajudge)
@pytest.mark.parametrize(
"constraint", all_constraints, ids=Constraint.get_description
)
def test_constraint(constraint, datajudge_engine):
test_result = constraint.test(datajudge_engine)
> assert test_result.outcome, test_result.failure_message
E AssertionError: tempdb.public.twitch_v2's column(s) 'language' breaks regex
'^[a-zA-Z]+$' in 0.045454545454545456 > 0.0 of the cases. In absolute terms, 1
of the 22 samples violated the regex. Some counterexamples consist of the
following: ['Sw3d1zh'].
../../../src/datajudge/pytest_integration.py:25: AssertionError
____________ test_func[UniquesEquality::public.twitch_v1 | public.twitch_v2] _____________
constraint = <datajudge.constraints.uniques.UniquesEquality object at 0x10855d270>
datajudge_engine = Engine(postgresql://datajudge:***@localhost:5432/datajudge)
@pytest.mark.parametrize(
"constraint", all_constraints, ids=Constraint.get_description
)
def test_constraint(constraint, datajudge_engine):
test_result = constraint.test(datajudge_engine)
> assert test_result.outcome, test_result.failure_message
E AssertionError: tempdb.public.twitch_v1's column(s) 'language' doesn't have
the element(s) '{'Sw3d1zh'}' when compared with the reference values.
../../../src/datajudge/pytest_integration.py:25: AssertionError
______________ test_func[NumericMean::public.twitch_v2 | public.twitch_v2] _______________
constraint = <datajudge.constraints.numeric.NumericMean object at 0x1084e1810>
datajudge_engine = Engine(postgresql://datajudge:***@localhost:5432/datajudge)
@pytest.mark.parametrize(
"constraint", all_constraints, ids=Constraint.get_description
)
def test_constraint(constraint, datajudge_engine):
test_result = constraint.test(datajudge_engine)
> assert test_result.outcome, test_result.failure_message
E AssertionError: tempdb.public.twitch_v2's column(s) 'average_viewers' has
mean 4734.9780000000000000, deviating more than 0.1 from
tempdb.public.twitch_v2's column(s) 'average_viewers''s
3599.9826086956521739. Condition on second table: WHERE mature IS TRUE
../../../src/datajudge/pytest_integration.py:25: AssertionError
================================ short test summary info =================================
FAILED twitch_specification.py::test_func[VarCharRegex::tempdb.public.twitch_v2] - Asse...
FAILED twitch_specification.py::test_func[UniquesEquality::public.twitch_v1 | public.twitch_v2]
FAILED twitch_specification.py::test_func[NumericMean::public.twitch_v2 | public.twitch_v2]
============================== 3 failed, 5 passed in 1.52s ===============================
Alternatively, you can also look at these test results in this html report generated by pytest-html.
Hence we see that we might not want to blindly trust version 2 of the data as is. Rather, we might need to investigate what is wrong with the data, what this has been caused by and how to fix it.
Concretely, what exactly do we learn from the error messages?
The column
language
now has a row with value'Sw3d1zh'
. This break two of our constraints. TheVarCharRegex
constraint compared the columns’ values to a regular expression. TheUniquesEquality
constraint expected the unique values of thelanguage
column to not have changed between version 1 and version 2.The mean value of
average_viewers
ofmature
channels is substantially - more than our 10% tolerance - lower than the global mean.