Event processing

What is event processing?

The point of processing events, after ingestion, is to facilitate downstream operations. Specifically, event processing should make querying for usage as simple as possible. Cleaning, moving or transforming raw events can provide multiple benefits for downstream consumers.

Processing the events will occur after ingestion. This can happen for each individual event or in bulk. Regardless of the method, the outcome is the same. A table of events that are processed and ready to be queried for usage.

image1

Why is event processing important?

Often when raw events are ingested, they are not ready for consumption. Duplicate, incorrect or excessively granular data can be remedied with event processing. For example, event aggregation can be applied during event processing. This step of the pipeline is all about offering flexibility to the metering service. Building a portable and robust ingest client is challenging and can result in tradeoffs around performance, format, or data enrichment. The event processing step can provide recourse for those drawbacks.

The other important part of this step is classifaction. The purpose of metering is to create a measurement of product usage from events. In order to understand product usage, events must be attributable to specific products. Attribution occurs during event processing where each event will be tagged with the relevant product_uid. The end result is a set of formatted events that are straightforward to query.

How is event processing modeled?

It depends on the use case. This can also vary based on the tech stack, but for the purposes of bframe it will be assumed that the transformation process occurs in bulk ETL jobs. The following sections will cover common use cases for processing.

Duplicates

bframe uses the transaction_id as a unique identifier on the raw events. The reduction step is straightforward. Partition the events by transaction_id and then proceed to rank the events by received_at. Once ranked, filter out any events that are not rank 1. The output will only include unique events.

The customer “Lupe” had a small hiccup in their event ingestion service and ended up creating duplicate events.

Raw events

customer_id

transaction_id

properties

metered_at

received_at

Lupe

1714761065.0Hemiphyllodactylus_cattien_325514626.jpg

{“name”:”update”,”agg_value”:12.0}

2024-05-03 19:00:00

2024-05-03 19:00:00

Lupe

1714761065.0Hemiphyllodactylus_cattien_325514626.jpg

{“name”:”update”,”agg_value”:12.0,”category”:”INaturalist”}

2024-05-03 19:00:00

2024-05-03 19:00:00

Lupe

1714761065.0Hemiphyllodactylus_cattien_325514626.jpg

{“name”:”update”,”agg_value”:12.0}

2024-05-03 19:00:00

2024-05-03 19:00:00

Unique events

customer_id

transaction_id

properties

metered_at

received_at

Lupe

1714761065.0Hemiphyllodactylus_cattien_325514626.jpg

{“name”:”update”,”agg_value”:12.0}

2024-05-03 19:00:00

2024-05-03 19:00:00

Lupe

1714761065.0Hemiphyllodactylus_cattien_325514626.jpg

{“name”:”update”,”agg_value”:12.0,”category”:”INaturalist”}

2024-05-03 19:00:00

2024-05-03 19:00:00

Lupe

1714761065.0Hemiphyllodactylus_cattien_325514626.jpg

{“name”:”update”,”agg_value”:12.0}

2024-05-03 19:00:00

2024-05-03 19:00:00

Unfortunately this method has a large flaw. The amount of data this query will scan is unbounded. For example, if an operator has years worth of data then the deduplication job will have to process every single row in existence. This is problematic since it can become a bottleneck for the metering service. One way to bound the query inputs is limiting the time range of the window. So instead of deduplicating against all events, only deduplicate events for the last n days. n can then be a multiple of how frequently the books are closed or the billing period chosen for invoicing. For example if invoices are billed monthly, then a good choice for n would be 90.

SELECT * EXCLUDE rank
FROM (
    SELECT raw.*, ROW_NUMBER() OVER(
        PARTITION BY raw.transaction_id
        ORDER BY raw.received_at DESC
    ) as rank
    FROM src.events AS raw
    WHERE raw.metered_at BETWEEN (now() - to_days(90)) AND now()
)
WHERE rank = 1;

Using a limited lookback window is a best practice, but it does introduce the potential for late arriving data to be duplicative. Fortunately, backdated events are infrequently created and are often done manually. With careful validation, it is simple to avoid creating duplicate data.

Transformations

Updating the raw event itself is what’s known as a transform. New or mutated fields will be generated during transformation. There are many different ways to change an event and the right way to do it is based on the use case. For example, product usage may be measured in GBs but the event data is tabulated in MBs. This can be handled as a transformation on the field that holds the usage data in MBs.

Another common transform is to update the customer identifier. During ingestion the durable identifier may not be accessible. So instead of the customers durable key being used a natural key might be used in its place. While querying based off of a natural key is possible, that’s not how bframe is designed. bframe expects the durable key, which could also be the natural key, to be present on each event. In the case that raw events only have a natural key present, the identifier can be updated during event processing.

Editing events

A single malformed event can ruin an otherwise accurate business model. Consequently, obtaining a path to amend events is critical. A direct approach is to manually update a specific row. This is not considered a best practice since metadata around the update and the original event is lost. The recommended path path within bframe is to append an additional event that corrects the first one. This can be accomplished with a completely new event, or by duplicating the transaction_id and using a more recent received_at. Since deduplication occurs during event processing this would take place during the stage of the pipeline.

Classifying events

Based on the contents, an event will be attributed to a specific product. This is denoted by the product_uid field that is joined to the event. In the case that a single event is associated with several products, multiple rows will exist with separate product_uid values. The Wikipedia example has two usage based products with different event names that are associated with each product.

Products

id

name

ptype

event_name

filters

agg_property

created_at

1

Updates

EVENT

update

{}

agg_value

2023-11-01

2

Creates

EVENT

create

{}

agg_value

2023-11-01

Sample processed events

customer_id

transaction_id

properties

metered_at

product_uid

received_at

Kreuz und quer

1714527927.0Gathering-of-overmountain-men-branson-tn1.jpg

{“name”:”update”,”agg_value”:12.0,”category”:”Supported_by_Wikimedia_Deutschland”}

2024-05-01 02:00:00

1

2024-05-01 02:00:00

Helper201

1714525778.0Love_And_Peace_Cultural_Event_(cropped).jpg

{“name”:”create”,”agg_value”:344.0,”category”:”Supported_by_Wikimedia_Deutschland”}

2024-05-01 02:00:00

2

2024-05-01 02:00:00

Out of the box, bframe will match $.properties.name with the event_name column on the products table. This standard functionality is limited, but many use cases only require basic matching. In order to achieve complex classifications filters can be used. The filters field is a JSON object that stores logic on whether or not an event matches the product. Below is an example of a couple filters:

Unnested filters

unnested_filters

{“path”:”$.region”,”_in”:[],”not_in”:[“test-region”],”optional”:true}

{“path”:”$.service”,”_in”:[“sqs”],”not_in”:[],”optional”:false}

There are four different fields that can appear on a single filter. path represents the event property that is to be compared against. The path is expected to be a JSON path expression. _in contains a list of values that the event property must be within. If nothing is in the list it’s ignored. not_in contains a list of values that the event property must not be within. Similarly if nothing is in the list it’s ignored. optional is a boolean flag that determines whether or not the event property must exist. If a filter is marked as optional, when the event property does exist it must also pass the _in and not_in constraints. If the event property does not exist and it’s marked optional the _in and not_in constraints are ignored.

Although there are only a few different filter types this covers most use cases. In the case that it doesn’t, breaking out of the defaults is possible and encouraged. To express a bespoke or unique classification a user defined view can be employed.