Skip to main content

Advanced Query

Get to know how to apply an Advanced Query to a Subscription

Q-Flow Advanced Query

With Q-Flow Advanced Query, users can enhance their Subscriptions by applying powerful filters, transformations, and aggregations to Topics and Sources, ensuring they receive only the most relevant and tailored event data.

Q-Flow Advanced Query empowers you to filter, transform, and aggregate your incoming events using intuitive SQL syntax. By writing concise SQL clauses, you can extract precisely the data you need, combine and summarize complex information, and reshape event payloads for downstream services.

  • Filter: Include or exclude events based on event types or specific conditions within the payload.
  • Transforming: Restructure event payloads for simplified consumption.
  • Aggregate: Summarize event data to discover trends or performance insights.

To apply an Advance Query to your Subscription:

Apply Advance Query to Subscription

  1. Either create a new Subscription for a Topic or Source, or edit an existing Subscription.
  2. Press the Plus button within the Subscription flow.
  3. The Advanced Query Modal will be shown.

Apply Advance Query to Subscription

  1. Within the input section, paste example events you want to apply Advance Querying to.
  2. Within the Advance Query section, now define your SQL query. We leverage DuckDb SQL syntax for a simple SQL experience. Learn more here DuckDb.
  3. Once you're happy with your Query, Press the Test Filter.
  4. The output section will now show the events, applying the query. Check to make sure the Filtering, transformation or aggregation is being successfully applied.
  5. Once you're happy, just press the Save button.
  6. After saving the Query, update the Subscription by pressessing the Update button.

It’s really that simple! Your subscription is now configured and will begin delivering the events you’ve refined with Advanced Querying.

Examples of Advance Query

Advance Filtering

You don’t always need to process every event for a Subscription. Advanced filtering allows you to narrow down exactly what’s delivered to your endpoint, based on criteria like amounts, statuses, or any custom fields within the event Payload.

SELECT *
FROM events
WHERE eventType = 'PURCHASE'
AND data.status = 'COMPLETED';

Here, we’ll only retrieve events where the event type is a PURCHASE and the status is COMPLETED:

Transformation

Transformations let you reshape the data before it’s sent. Common use cases include:

  • Redacting sensitive information
  • Renaming fields to match internal naming conventions
  • Simplifying or flattening nested structures
    event_id,
user_id,
'Redacted' AS redacted_user_name,
CONCAT('****@', SUBSTR(email, INSTR(email, '@') + 1)) AS redacted_email,
'****' AS redacted_payment_card,
'****' AS redacted_billing_address,
'****' AS redacted_phone_number,
timestamp
FROM events;

The example above demonstrates how to transform and redact sensitive information, ensuring that only the necessary data is included while protecting sensitive details.

Aggregation

Sometimes you want deeper analytics from your events before pushing them downstream. For instance, you might need to find the top 3 largest amounts for each payment event type:

WITH ranked_payments AS (
SELECT
type,
CAST(json_extract_string(data, '$.amount') AS DOUBLE) AS amount,
ROW_NUMBER() OVER (
PARTITION BY type
ORDER BY CAST(json_extract_string(data, '$.amount') AS DOUBLE) DESC
) AS rn
FROM events
WHERE type LIKE 'payment.%'
)
SELECT *
FROM ranked_payments
WHERE rn <= 3
ORDER BY type, amount DESC;

How It Works:

  1. We filter to payment.* event types.
  2. We cast the amount field to a numeric type for sorting.
  3. We rank each payment by amount within its event type, then pick the top 3. While the exact SQL query depends on your specific use case, an advanced aggregation query like this can pre-process events for downstream services, reducing the need for separate analytics pipelines.

Example Use cases for Q-Flow Advanced Query

We've put together a couple of example use cases to demonstrate the simple but powerful capabilities of the Q-Flow Advanced Query.

Use Case: Event Stream Optimization

Problem: Multiple events for the same entity (e.g., transaction) generate redundant data, leading to inefficiencies.

Solution: Group by transaction_id and event_type, and select only the latest event based on the timestamp to reduce event volume downstream.

Benefits:

  • Reduces data load
  • Improves system performance
  • Prevents redundant notifications
WITH ranked_events AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY transactionId, type
ORDER BY timestamp DESC
) AS rank
FROM events
)
SELECT
*
FROM ranked_events
WHERE rank = 1
ORDER BY
transactionId,
type;

Use Case: Aggregating Shopify Order Data

Problem: Shopify webhook events generate multiple notifications for each order, containing product quantities, total amounts, and taxes. This creates redundant data for downstream processing.

Solution: Aggregate Shopify orderCreated events to calculate the total products, total amount, and taxed amount for each order by grouping the events based on orderId.

Benefits:

  • Reduces Data Volume: Sends only aggregated order details, reducing unnecessary data.
  • Improves Efficiency: Simplifies processing by summarizing key metrics.
  • Better Analytics: Provides clear insights into order totals and taxes.
WITH orderDetails AS (
SELECT
orderId,
SUM(lineItemQuantity) AS totalProducts,
SUM(orderTotal) AS totalAmount,
SUM(taxedAmount) AS taxedAmount
FROM events
CROSS JOIN UNNEST(lineItems) AS lineItem(lineItemQuantity) -- Unnest line items to get quantities
CROSS JOIN UNNEST(taxLines) AS taxLine(taxedAmount) -- Unnest tax lines to get taxed amounts
WHERE
type = 'orderCreated' -- Assuming you want the order-created events
GROUP BY
orderId
)
SELECT
orderId,
totalProducts,
totalAmount,
taxedAmount
FROM orderDetails
ORDER BY
orderId;

Use Case: Downsampling Event Metrics for a metric database like duckdb

Problem: Event metrics (e.g., transaction amounts, product quantities, etc.) are generated at high frequency, leading to high data volume in InfluxDB. Storing every single event metric may overwhelm the database and reduce query performance.

Solution: Downsample the event metrics by aggregating data at a lower frequency (e.g., hourly or daily). This reduces the amount of data stored in InfluxDB while still retaining meaningful insights by calculating averages, sums, or other aggregates over time periods.

Benefits:

  • Reduces Data Storage: Stores only aggregated metrics, lowering storage requirements.
  • Improves Query Performance: By reducing the volume of data, queries are faster and more efficient.
  • Sufficient Granularity: Maintains useful insights with less frequent sampling of metrics (e.g., hourly summaries).
SELECT
type,
SUM(transactionAmount) AS totalTransactionAmount
FROM events
GROUP BY
type,
time(5m) -- Grouping by event type and 5-minute time buckets
ORDER BY
time DESC;

Use Case: Detect and notify Large/Unusual Transactions

You might want to flag transactions that deviate significantly from a typical purchase, such as unusually large amounts or frequent small transactions that could indicate fraud.

Trigger: Large transactions (e.g., $500 or more) that deviate from the customer’s typical purchase history.

How to Implement:

  • Track the average transaction value for each customer.
  • Flag transactions above a certain threshold (e.g., 2x the customer's average) as potentially fraudulent.
  • Real-time flags with customer IDs and transaction amounts.
SELECT
customerId,
transactionId,
amount,
AVG(amount) OVER (
PARTITION BY customerId
ORDER BY timestamp
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
) AS moving_avg_amount,
ABS(amount - moving_avg_amount) AS anomaly_score
FROM events
WHERE
ABS(amount - moving_avg_amount) > 100; -- Flag if the deviation is greater than 100

Use Case: Detect Duplicate Payments by Transaction ID or Amount

Duplicate transactions can happen due to issues like user retries, system errors, or malicious intent. Detecting duplicates in real-time can help prevent overcharging customers or processing repeated payments.

Trigger: If the same transaction_id or payment amount appears more than once within a short period (e.g., 30 seconds), flag it as a possible duplicate.

How to Implement:

  • Use a short window (e.g., 30 seconds) to track transactions with the same transaction ID or amount.
  • Alert the system if a duplicate is detected.
SELECT
customerId,
transactionId,
COUNT(*) AS duplicate_count
FROM events
WHERE
timestamp > CURRENT_TIMESTAMP - INTERVAL '30 seconds'
GROUP BY
customerId,
transactionId
HAVING
duplicate_count > 1;

Use Case: Flag Cross-Border Transactions

Certain transactions are riskier due to factors like payment method (credit card vs. bank transfer), geography (cross-border payments), or rapid purchase frequency (multiple payments within a short window).

Trigger: Flag transactions made from a different country or region than usual for a given customer.

How to Implement:

  • Monitor the geographical location (country or region) from where the transaction is initiated.
  • If the country is different from the customer's usual region, flag the transaction for further verification.
SELECT
customerId,
transactionId,
country,
COUNT(*) AS suspicious_activity_count
FROM events
WHERE
country != customer_usual_country -- Compare to a customer's usual country
GROUP BY
customerId,
country
HAVING
suspicious_activity_count > 2; -- Flag if more than 2 suspicious transactions occur in a short time

Use Case: Track Refunds and Chargebacks

Frequent refunds or chargebacks might indicate customer dissatisfaction or even fraud, especially if the patterns are repetitive or happen shortly after purchase.

Trigger: If a customer requests a refund shortly after a purchase, this could indicate potential issues or fraud.

How to Implement:

  • Monitor refund events and detect if they happen soon after a payment.
  • Track chargebacks if available in the data.
SELECT
customerId,
COUNT(*) AS refund_count,
MIN(timestamp) AS first_refund_time
FROM events
WHERE
timestamp > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY
customerId
HAVING
refund_count > 2; -- Flag customers with more than 2 refunds in the last 24 hours