Developing a NodeJS Wrapper for Snowflake SDK (Pagination included)

Developing a NodeJS Wrapper for Snowflake SDK (Pagination included)

In recent years, Snowflake has gained popularity as a cloud-based data warehousing solution due to its ability to handle large amounts of data. Unlike traditional databases, Snowflake separates storage and compute, allowing users to scale each independently.

⬇️ In this article, I'll propose a simple wrapper to interact with the Snowflake SDK using NodeJS.

Prerequisites

Before using Snowflake, there are a few prerequisites that you need.

  • Configuring Access to the Snowflake API: To access Snowflake from NodeJS, you will need to configure access to the Snowflake API. This involves creating an account, generating API keys, and configuring network policies.

  • Installing Dependencies: You will need to install the Snowflake NodeJS SDK and any other required dependencies..

⌨️ npm i snowflake-sdk && npm i -D @types/snowflake-sdk

Interact with Snowflake

The Snowflake SDK provides a set of APIs for interacting with Snowflake, including APIs for connecting to Snowflake, executing queries, and performing other database operations.

const snowflake = require('snowflake-sdk');

// Create a connection to Snowflake
const connection = snowflake.createConnection({
  account: '<your_account_name>',
  username: '<your_username>',
  password: '<your_password>',
  region: '<your_region>'
});

connection.connect((err, conn) => {
  if (err) {
    console.error('Error connecting to Snowflake:', err);
    return;
  }

  // Execute a query
  conn.execute({
    sqlText: 'SELECT * FROM my_table',
    complete: (err, stmt, rows) => {
      if (err) {
        console.error('Error executing query:', err);
        return;
      }

      console.log('Query results:', rows);

      // Destroy connection
      conn.destroy(() => {
        console.log('Connection closed.');
      });
    }
  });
});

In this example, we create a connection to Snowflake using the createConnection method and execute a simple SQL query using the execute method. The results of the query are logged to the console, and the connection is closed using the destroy method.

--

Building a wrapper with pagination

Configuration

👉 First, we define an abstract class called SnowflakeServiceAbstract. This class has two main properties: snowflakeEnvironment and snowflakeConfig both contain information about the Snowflake account, such as the account name, username, password, role, warehouse, database, and schema. This information is necessary to establish a connection to the Snowflake account.

export abstract class SnowflakeServiceAbstract {
    protected abstract logger: Logger;

    protected snowflakeEnvironment: snowflake.ConnectionOptions;
    protected snowflakeConfig: snowflake.ConfigureOptions;

    protected constructor() {
        this.snowflakeConfig = {
            insecureConnect: process.env.SNOWFLAKE_INSECURECONNECT,
            ocspFailOpen: process.env.SNOWFLAKE_OCSP,
            logLevel: process.env.SNOWFLAKE_LOGLEVEL,
        };
        this.snowflakeEnvironment = {
            authenticator: "SNOWFLAKE",
            account: process.env.SNOWFLAKE_ACCOUNT,
            password: process.env.SNOWFLAKE_PASSWORD,
            username: process.env.SNOWFLAKE_USERNAME,
            role: process.env.SNOWFLAKE_ROLE,
            warehouse: process.env.SNOWFLAKE_WAREHOUSE,
            database: process.env.SNOWFLAKE_DATABASE,
            schema: process.env.SNOWFLAKE_SCHEMA,
        }
    }
}

Managing Snowflake connection

👉 Then, we'll add 3 methods to manage our Snowflake connection.

The first method, connectSnowflake, establishes a connection to the Snowflake account using the configuration options and environment variables that were set in the constructor. This method returns a promise that resolves to the connection object if successful, or rejects with an error if unsuccessful.

The second method, disconnectSnowflake, disconnects from Snowflake. It provides a way to cleanly disconnect from a Snowflake database connection that was established.

The third method, cancelStatementSnowflake, cancels a Snowflake statement. It is useful when you want to cancel a query or operation taking too long to execute or because it is blocking other queries.

export abstract class SnowflakeServiceAbstract {
    protected abstract logger: Logger;

    protected snowflakeEnvironment: snowflake.ConnectionOptions;
    protected snowflakeConfig: snowflake.ConfigureOptions;

    protected constructor() {
        this.snowflakeConfig = {
            insecureConnect: process.env.SNOWFLAKE_INSECURECONNECT,
            ocspFailOpen: process.env.SNOWFLAKE_OCSP,
            logLevel: process.env.SNOWFLAKE_LOGLEVEL,
        };
        this.snowflakeEnvironment = {
            authenticator: "SNOWFLAKE",
            account: process.env.SNOWFLAKE_ACCOUNT,
            password: process.env.SNOWFLAKE_PASSWORD,
            username: process.env.SNOWFLAKE_USERNAME,
            role: process.env.SNOWFLAKE_ROLE,
            warehouse: process.env.SNOWFLAKE_WAREHOUSE,
            database: process.env.SNOWFLAKE_DATABASE,
            schema: process.env.SNOWFLAKE_SCHEMA,
        }
    }

    private connectSnowflake = async (): Promise <any> =>
        new Promise((resolve: any, reject: any) => {
            try {
                snowflake.configure(this.snowflakeConfig);
                const connection: snowflake.Connection = snowflake.createConnection(
                    this.snowflakeEnvironment,
                );

                if (connection) {
                    connection.connect(
                        (err: snowflake.SnowflakeError, conn: snowflake.Connection) => {
                            if (err) {
                                reject(err);
                            } else {
                                this.logger.log(`Connected Snowflake Id: ${conn.getId()}`);
                                resolve(conn);
                            }
                        },
                    );
                }
            } catch (err) {
                reject(err);
            }
        });

    private disconnectSnowflake = async (connection: snowflake.Connection) =>
        new Promise((resolve: any, reject: any) => {
            try {
                connection.destroy((err: err: snowflake.SnowflakeError, _conn: snowflake.Connection) => {
                    if (err) {
                        this.logger.error('Unable to disconnect Snowflake: ' + err.message);
                        reject(err);
                    } else {
                        this.logger.log(`Disconnected Snowflake Id: ${connection.getId()}`);
                        resolve();
                    }
                });
            } catch (err) {
                reject(err);
            }
        });

    private cancelStatementSnowflake = async (statement: snowflake.Statement) =>
        new Promise((resolve: any, reject: any) => {
            try {
                statement.cancel((err: snowflake.SnowflakeError, _stmt: snowflake.Statement) => {
                    if (err) {
                        this.logger.error('Error while cancel statement ' + err.message);
                        reject(err);
                    } else {
                        this.logger.log('Successfully aborted statement');
                        resolve();
                    }
                });
            } catch (err) {
                reject(err);
            }
        });
}

Executing queries

👉 Finally, we add a final method which will be called when we want to execute a query and get the result. It takes in a SnowflakeQueryInterface object as an argument which contains the query to execute, any query parameters (binds), and pagination information.

The SnowflakeResultInterface type defines the structure of the result returned by the query.

The method first connects to the Snowflake database using the connectSnowflake method then executes the query passing in the SQL text, bind parameters, and other options like streaming results. The complete callback function is invoked when the query execution is complete.

It performs pagination on the result set, based on the startIndex and count properties. If startIndex is not provided, it defaults to 0. If count is not provided, it returns the first 1000 rows.

The error event is triggered if there is any error while consuming the rows. In that case, the method logs an error message, cancels the statement, disconnects and rejects with the error.

The end event is triggered when all the rows have been consumed.

Final class and interfaces

export interface SnowflakeQueryInterface {
  readonly query: string;
  readonly binds?: snowflake.Binds;
  readonly startIndex?: number;
  readonly count?: number;
}

export interface SnowflakeResultInterface {
  readonly total: number;
  readonly pageSize: number;
  readonly hasNextPage: boolean;
  readonly results: any[];
}
import * as snowflake from 'snowflake-sdk';

export abstract class SnowflakeServiceAbstract {
  protected abstract logger: Logger;

  protected snowflakeEnvironment: snowflake.ConnectionOptions;
  protected snowflakeConfig: snowflake.ConfigureOptions;

  protected constructor() {
    this.snowflakeConfig = {
      insecureConnect: JSON.parse(process.env.SNOWFLAKE_INSECURECONNECT),
      ocspFailOpen: JSON.parse(process.env.SNOWFLAKE_OCSP),
      logLevel: JSON.parse(process.env.SNOWFLAKE_LOGLEVEL),
    };
    this.snowflakeEnvironment = {
      authenticator: 'SNOWFLAKE',
      account: process.env.SNOWFLAKE_ACCOUNT,
      password: process.env.SNOWFLAKE_PASSWORD,
      username: process.env.SNOWFLAKE_USERNAME,
      role: process.env.SNOWFLAKE_ROLE,
      warehouse: process.env.SNOWFLAKE_WAREHOUSE,
      database: process.env.SNOWFLAKE_DATABASE,
      schema: process.env.SNOWFLAKE_SCHEMA,
    };
  }

  private connectSnowflake = async (): Promise<any> =>
    new Promise((resolve: any, reject: any) => {
      try {
        snowflake.configure(this.snowflakeConfig);
        const connection: snowflake.Connection = snowflake.createConnection(
          this.snowflakeEnvironment,
        );

        if (connection) {
          connection.connect(
            (err: snowflake.SnowflakeError, conn: snowflake.Connection) => {
              if (err) {
                reject(err);
              } else {
                this.logger.log(`Connected Snowflake Id: ${conn.getId()}`);
                resolve(conn);
              }
            },
          );
        }
      } catch (err) {
        reject(err);
      }
    });

  private disconnectSnowflake = async (connection: snowflake.Connection) =>
    new Promise((resolve: any, reject: any) => {
      try {
        connection.destroy(
          (err: snowflake.SnowflakeError, _conn: snowflake.Connection) => {
            if (err) {
              this.logger.error(
                'Unable to disconnect Snowflake: ' + err.message,
              );
              reject(err);
            } else {
              this.logger.log(
                `Disconnected Snowflake Id: ${connection.getId()}`,
              );
              resolve();
            }
          },
        );
      } catch (err) {
        reject(err);
      }
    });

  private cancelStatementSnowflake = async (statement: snowflake.Statement) =>
    new Promise((resolve: any, reject: any) => {
      try {
        statement.cancel(
          (err: snowflake.SnowflakeError, _stmt: snowflake.Statement) => {
            if (err) {
              this.logger.error('Error while cancel statement ' + err.message);
              reject(err);
            } else {
              this.logger.log('Successfully aborted statement');
              resolve();
            }
          },
        );
      } catch (err) {
        reject(err);
      }
    });

  public executeQuery = async (
    req: SnowflakeQueryInterface,
  ): Promise<SnowflakeResultInterface> =>
    new Promise(async (resolve: any, reject: any) => {
      try {
        const connection: snowflake.Connection = await this.connectSnowflake();

        /* PAGINATION */
        const startIndex: number = req.startIndex ? req.startIndex : 0;
        const endIndex: number = req.count
          ? startIndex + Math.min(req.count, 999) - 1
          : startIndex + 999;

        connection.execute({
          sqlText: req.query,
          binds: req.binds ? req.binds : [],
          streamResult: true,
          complete: (
            completeErr: snowflake.SnowflakeError,
            statement: snowflake.Statement,
            _rows: any,
          ) => {
            if (completeErr) {
              this.logger.error(completeErr);
              reject(completeErr);
            }

            const rowsArr: any[] = [];

            this.logger.log(`Query id: ${statement.getStatementId()}`);

            const stream = statement.streamRows({
              start: startIndex,
              end: endIndex,
            });

            stream.on('error', async (streamErr: Error) => {
              this.logger.error('Unable to consume all rows');

              await this.cancelStatementSnowflake(statement);
              await this.disconnectSnowflake(connection);

              reject(streamErr);
            });

            stream.on('data', (row: any) => {
              rowsArr.push(row);
            });

            stream.on('end', async () => {
              this.logger.log('All rows consumed');
              resolve({
                total: statement.getNumRows(),
                pageSize: rowsArr.length,
                hasNextPage: endIndex < statement.getNumRows() - 1,
                results: rowsArr,
              });

              await this.disconnectSnowflake(connection);
            });
          },
        });
      } catch (err) {
        reject(err);
      }
    });
}

I hope this article has been helpful to you. If you have any questions or suggestions, please feel free to contact me. Thank you for reading!👋