• Home  >  
  • Perspectives  >  
  • Solving Merchant Identity Extraction in Finance: Snowpark’s Data Engineering Solution  
Blog July 26, 2024
4 min read

Solving Merchant Identity Extraction in Finance: Snowpark’s Data Engineering Solution

Learn how a fintech leader solved merchant identification challenges using Snowpark and local testing. This case study showcases Tiger Analytics’ approach to complex data transformations, automated testing, and efficient development in financial data processing. Discover how these solutions enhanced fraud detection and revenue potential.

In the high-stakes world of financial technology, data is king. But what happens when that data becomes a labyrinth of inconsistencies? This was the challenge faced by a senior data engineer at a leading fintech company.

“Our merchant identification system is failing us. We’re losing millions in potential revenue and our fraud detection is compromised. We need a solution, fast.”

The issue was clear but daunting. Every day, their system processed millions of transactions, each tied to a merchant. But these merchant names needed to be more consistent. For instance, a well-known retail chain might be listed as its official name, a common abbreviation, a location-specific identifier, or simply a generic category. This inconsistency was wreaking havoc across the business.

Initial Approach: Snowflake SQL Procedures

Initially, the data engineer and his team developed Snowflake SQL procedures to handle this complex data transformation. While these procedures worked they wanted to add the automated testing pipelines and quickly realized the limitations. “We need more robust regression and automated testing capabilities. And we need to implement these tests without constantly connecting to a Snowflake account.” This capability wasn’t possible with traditional Snowflake SQL procedures, pushing them to seek external expertise.

Enter Tiger Analytics: A New Approach with Snowpark and Local Testing Framework

After understanding the challenges, the Tiger team proposed a solution: leveraging Snowpark for complex data transformations and introducing a local testing framework. This approach aimed to solve the merchant identity issue and improve the entire data pipeline process.

To meet these requirements, the team turned to Snowpark. Snowpark enabled them to perform complex data transformations and manipulations within Snowflake, leveraging the power of Snowflake’s computational engine. However, the most crucial part was the Snowpark Python Local Testing Framework. This framework allowed the team to develop and test their Snowpark DataFrames, stored procedures, and UDFs locally, fulfilling the need for regression testing and automated testing without connecting to a Snowflake account.

Key Benefits

  • Local Development: The team could develop and test their Snowpark Python code without a Snowflake account. This reduced the barrier to entry and sped up their iteration cycles.
  • Efficient Testing: By utilizing familiar testing frameworks like PyTest, the team integrated their tests seamlessly into existing development workflows.
  • Enhanced Productivity: The team quickly iterated on their code with local feedback, enabling rapid prototyping and troubleshooting before deploying to their Snowflake environment.

Overcoming Traditional Unit Testing Limitations

In the traditional sense of unit testing, Snowpark does not support a fully isolated environment independent of a Snowflake instance. Typically, unit tests would mock a database object, but Snowpark lacks a local context for such mocks. Even using the create_dataframe method requires Snowflake connectivity.

The Solution with Local Testing Framework

Despite these limitations, the Snowpark Python Local Testing Framework enabled the team to create and manipulate DataFrames, stored procedures, and UDFs locally, which was pivotal for our use case. Here’s how the Tiger team did it:

Setting Up the Environment

First, set up a Python environment:

pip install "snowflake-snowpark-python[localtest]"
pip install pytest

Next, create a local testing session:

from snowflake.snowpark import Session
session = Session.builder.config('local_testing', True).create()

Creating Local DataFrames

The Tiger team created DataFrames from local data sources and operated on them:

table = 'example'
session.create_dataframe([[1, 2], [3, 4]], ['a', 'b']).write.save_as_table(table)

Operating on these DataFrames was straightforward:

df = session.create_dataframe([[1, 2], [3, 4]], ['a', 'b'])
res = df.select(col('a')).where(col('b') > 2).collect()
print(res)

Creating UDFs and Stored Procedures

The framework allowed the team to create and call UDFs and stored procedures locally:

from snowflake.snowpark.functions import udf, sproc, call_udf, col
from snowflake.snowpark.types import IntegerType, StringType

@udf(name='example_udf', return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
def example_udf(a, b):
    return a + b

@sproc(name='example_proc', return_type=IntegerType(), input_types=[StringType()])
def example_proc(session, table_name):
    return session.table(table_name)\
        .with_column('c', call_udf('example_udf', col('a'), col('b')))\
        .count()

# Call the stored procedure by name
output = session.call('example_proc', table)

Using PyTest for Efficient Testing

The team leveraged PyTest for efficient unit and integration testing:

PyTest Fixture

In the conftest.py file, the team created a PyTest fixture for the Session object:

import pytest
from snowflake.snowpark.session import Session

def pytest_addoption(parser):
    parser.addoption("--snowflake-session", action="store", default="live")

@pytest.fixture(scope='module')
def session(request) -> Session:
    if request.config.getoption('--snowflake-session') == 'local':
        return Session.builder.configs({'local_testing': True}).create()
    else:
        snowflake_credentials = {} # Specify Snowflake account credentials here
        return Session.builder.configs(snowflake_credentials).create()
Using the Fixture in Test Cases
from project.sproc import my_stored_proc
  
def test_create_fact_tables(session):
    expected_output = ...
    actual_output = my_stored_proc(session)
    assert expected_output == actual_output
Running Tests

To run the test suite locally:

pytest --snowflake-session local

To run the test suite against your Snowflake account:

pytest

Addressing Unsupported Functions

Some functions were not supported in the local testing framework. For these, the team used patch functions and MagicMock:

Patch Functions

For unsupported functions like upper(), used patch functions:

from unittest.mock import patch
from snowflake.snowpark import Session
from snowflake.snowpark.functions import upper
  
session = Session.builder.config('local_testing', True).create()
  
@patch('snowflake.snowpark.functions.upper')
def test_to_uppercase(mock_upper):
    mock_upper.side_effect = lambda col: col + '_MOCKED'
    df = session.create_dataframe([('Alice',), ('Bob',)], ['name'])
    result = df.select(upper(df['name']))
    collected = result.collect()
    assert collected == [('Alice_MOCKED',), ('Bob_MOCKED',)]
  
MagicMock

For more complex behaviors like explode(), team used MagicMock:

from unittest.mock import MagicMock
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col

session = Session.builder.config('local_testing', True).create()

def test_explode_df():
    mock_explode = MagicMock()
    mock_explode.side_effect = lambda: 'MOCKED_EXPLODE'
    df = session.create_dataframe([([1, 2, 3],), ([4, 5, 6],)], ['data'])
    with patch('snowflake.snowpark.functions.explode', mock_explode):
        result = df.select(col('data').explode())
        collected = result.collect()
        assert collected == ['MOCKED_EXPLODE', 'MOCKED_EXPLODE']

test_explode_df()    
Scheduling Procedures Limitations

While implementing these solutions, the Tiger team faced issues with scheduling procedures using serverless tasks, so they used the Task attached to the warehouse. They created the Snowpark-optimized warehouse. The team noted that serverless tasks cannot invoke certain object types and functions, specifically:

  • UDFs (user-defined functions) that contain Java or Python code.
  • Stored procedures are written in Scala (using Snowpark) or those that call UDFs containing Java or Python code.

Turning Data Challenges into Business Insights

The journey from the initial challenge of extracting merchant identities from inconsistent transaction data to a streamlined, efficient process demonstrates the power of advanced data solutions. The Tiger team leveraged Snowpark and its Python Local Testing Framework, not only solving the immediate problem but also enhancing their overall approach to data pipeline development and testing. The combination of regex-based, narration’s pattern-based, and ML-based methods enabled them to tackle the complexity of unstructured bank statement data effectively.

This project’s success extends beyond merchant identification, showcasing how the right tools and methodologies can transform raw data into meaningful insights. For data engineers facing similar challenges, this case study highlights how Snowpark and local testing frameworks can significantly improve data application development, leading to more efficient, accurate, and impactful solutions.

Explore more blogs

3 min read
April 25, 2024
Tiger's Snowpark-Based Framework for Snowflake: Illuminating the Path to Efficient Data Ingestion
Readshp-arrow-topright-large
3 min read
April 18, 2024
Migrating from Legacy Systems to Snowflake: Simplifying Excel Data Migration with Snowpark Python
Readshp-arrow-topright-large
6 min read
May 9, 2023
A Practical Guide to Setting Up Your Data Lakehouse across AWS, Azure, GCP and Snowflake
Readshp-arrow-topright-large
Copyright © 2024 Tiger Analytics | All Rights Reserved