How to write a custom data sink

Mike Kotsch

We started to play around with Apache Flink® to process some of our event data.

Apache Flink® is an open-source stream processing framework. It is the latest in streaming technology, providing high throughput with low-latency and exactly once semantics.

There are already many impressive projects built on top of Flink; their users include Uber, Netflix, Alibaba, and more.

Flink emerged from here in Berlin; one of the main driving forces behind its development is a local company called data Artisans. The community, centred mostly around the user group mailing list, is extremely active and helpful. data Artisans have provided some great tutorials which I highly recommend to get started.

We use Flink to perform a series of transformations on our data, which is generated by execution of business processes.

A single execution of a business process is called a case, and when a business process is executed tasks of the process are being performed. Every time a task is performed this results in some event data being emitted; these events are collected into the relevant case. A case has an ID and a ‘tracehash’; a tracehash is formed from all the events contained within a case.

At certain points we want to persist the results to a database (PostgreSQL), from where we serve a REST API.

In order to persist out results to some outside system, we have to use a data sink. Data sinks are connectors that consume Data Streams and forward them to files, sockets, external systems, or print them.

Flink provides a number of ‘out of the box’ connectors with various guarantees. It is also possible to define your own.

While investigating PostgreSQL sinks I came across this excellent Flink blog series. Philipp also writes a PostgreSQL sink which batches writes up to a given batch count. As our requirements emerged we needed more control over when to write to the database, so I decided to try implementing my own.

Using the JDBCOutputFormat

There is no out of the box PostgreSQL sink for Flink. This does not mean, however, that you have to start from scratch! The JDBCOutputFormat class can be used to turn any database with a JDBC database driver into a sink.

JDBCOutputFormat is/was part of the Flink Batch API, however it can also be used as a sink for the Data Stream API. It seems to be the recommended approach, judging from a few discussions I found on the Flink user group.

The JDBCOutputFormat requires a prepared statement, driver and database connection.

JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
 .setDrivername("org.postgresql.Driver")
 .setDBUrl("jdbc:postgresql://localhost:1234/test?user=xxx&password=xxx")
 .setQuery(query)
 .finish();

The query is the prepared statement, in our case this is sufficient:

String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";

Where our table looks like:

CREATE TABLE cases
(
 caseid VARCHAR(255),
 tracehash VARCHAR(255)
);

A JDBCOutputFormat can only store instances of Row. A Row is a wrapper for the parameters of the prepared statement. This means need to transform our data stream of cases into rows. We’re going to map the ID to case ID, and the trace hash to tracehash by implementing a Flink MapFunction.

DataStream<Case> cases = ...

  DataStream<Row> rows = cases.map((MapFunction<Case, Row>) aCase -> {
   Row row = new Row(2); // our prepared statement has 2 parameters
   row.setField(0, aCase.getId()); //first parameter is case ID
   row.setField(1, aCase.getTraceHash()); //second paramater is tracehash
   return row;
  });

And finally we can specify the sink:

rows.writeUsingOutputFormat(jdbcOutput);

Now we can run our Flink job. Our job keys the stream by case ID and builds or updates a case every time the window is evaluated.

Using windows to build cases

Flink processes streams of data; these are infinite. If they are infinite how do we know when to process some data? This is where windows come in. Windows are a mechanism to allow us to group together and evaluate data from the stream.

Windows can be specified using time, for example, evaluate all data from the last 10 minutes. They can also slide, or overlap: every minute evaluate all the data of the last 10 minutes. Or they can count: every 10 events, evaluate the last 20. Flink can also be extended to provide other more complex windowing heuristics.

For our use case we are currently using a session window: this means that the window is evaluated after some gap period has elapsed with no new data being received. This is because we currently expect ‘bursts’ in activity, with gaps, much like a session on a website. I expect we will end up implementing our own windowing heuristic as we better understand Flink and refine our requirements.

Running our job we see that every time the window is evaluated we get a new row in the database, woo! However, my console is being spammed with:

"Unknown column type for column %s. Best effort approach to set its value: %s."

This is because I did not set explicit type values for my parameters when I built the JDBCOutputFormat. I can do so using the builder and simply passing in an array of java.sql.Types.

JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
 .setDrivername("org.postgresql.Driver")
 .setDBUrl("jdbc:postgresql://localhost:1234/test?user=xxx&password=xxx")
 .setQuery(query)
 .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR }) //set the types
 .finish();

And now I don’t get spammed with warnings.

Ok, but instead of a new row I want to either create a new row if one does not exist, or update an existing row, i.e. do an upsert. I’m using PostgreSQL so I will just modify my query to include an ON CONFLICT statement.

String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?) ON CONFLICT (caseid) DO UPDATE SET tracehash=?";

This means I have a new parameter, and I must specify a value for this. I need to do so in my MapFunction, which now looks like this:

DataStream<Case> cases = ...

DataStream<Row> rows = cases.map((MapFunction<Case, Row>) aCase -> {
  Row row = new Row(3); // our prepared statement has 3 parameters
  row.setField(0, aCase.getId()); //first parameter is caseid
  row.setField(1, aCase.getTraceHash()); //second paramater is tracehash
  row.setField(2, aCase.getTraceHash()); //third parameter is also tracehash
  return row;
});

I must also add a type for this parameter when I build the JDBCOutputFormat:

JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
     .setDrivername("org.postgresql.Driver")
     .setDBUrl("jdbc:postgresql://localhost:1234/test?user=xxx&password=xxx")
     .setQuery(query)
     .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }) //set the types
     .finish();

I also need to add a constraint to my table:

CREATE TABLE cases
(
  caseid VARCHAR(255),
  tracehash VARCHAR(255),
  CONSTRAINT cases_unique UNIQUE (caseid)
);

This time when I run my job I do not get any two rows with the same case ID.

With this approach, every time I evaluate a case it is mapped to a row and written to the database. This is a lot of individual writes, what if I want to batch them up, so that I write to PostgreSQL less frequently?

JDBCOutputFormat has a batchInterval, which you can specify on the JDBCOutputFormatBuilder. If, however, I specify a batch interval of 5000, I would potentially never write anything to the database, or wait a very long time until anything was written.

Another approach would be to add both a batch interval and a timeout, however there is no easy way to extend JDBCOutputFormat to do this, so let’s write our own sink.

Writing your own Sink

In order to write a Sink you must implement SinkFunction<IN> where IN is the input type parameter. This was Row for our previous sink, this time we can use our Case type.

We also have to maintain a database connection and so would like more control over when this is created. Extending RichSinkFunction<IN> means that we get a call back when our function is initialized; this is a good place to set up the database connection.

Firstly, lets just get our sink working without any batching. We’ll extend RichSinkFunction<IN> with the same on conflict prepared statement and database schema as before. We’ll hard code the PostgreSQL driver and connection details.

public class RichCaseSink extends RichSinkFunction<Case> {

  private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
      + "VALUES (?, ?) "
      + "ON CONFLICT (caseid) DO UPDATE SET "
      + "  tracehash=?";

  private PreparedStatement statement;


  @Override
  public void invoke(Case aCase) throws Exception {

    statement.setString(1, aCase.getId());
    statement.setString(2, aCase.getTraceHash());
    statement.setString(3, aCase.getTraceHash());
    statement.addBatch();
    statement.executeBatch();
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    Class.forName("org.postgresql.Driver");
    Connection connection =
        DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio");

    statement = connection.prepareStatement(UPSERT_CASE);
  }

}

We need to add our new sink to our case data stream:

DataStream<Case> cases = ...
cases.addSink(new RichCaseSink());

Let’s run our job to test out if the sink works, which it does!

To add batching to our sink we’ll follow a similar approach to the JDBCOutputFormat, but with a timeout. We’ll maintain a count of cases since the last batch was saved and only save if either this count reaches a limit, or a certain time period has passed.

In our invoke function we have to increment a batch count, and only execute a batch if these conditions are met. Once a batch is executed we have to reset the count and record the time of the last batch.

@Override
public void invoke(Case aCase) throws Exception {

  statement.setString(1, aCase.getId());
  statement.setString(2, aCase.getTraceHash());
  statement.setString(3, aCase.getTraceHash());
  statement.addBatch();
  batchCount++;

  if (shouldExecuteBatch()) {
    statement.executeBatch();
    batchCount = 0;
    lastBatchTime = System.currentTimeMillis();
  }
}

The number of times we write to the database is now reduced, which is great, but it doesn’t really mirror how Flink is doing things.

There are also other issues with this implementation, i.e. a new event must appear if we want to write to the DB after a certain amount of time. We could implement a ScheduledExecutorService to get around this, however lets take a closer look at Flink instead.

Making your Sink checkpoint aware

Flink also has a concept of checkpointing:

Every function and operator in Flink can be stateful. Stateful functions store data across the processing of individual elements/events, making state a critical building block for any type of more elaborate operation.

In order to make state fault tolerant, Flink needs to checkpoint the state. Checkpoints allow Flink to recover state and positions in the streams to give the application the same semantics as a failure-free execution.

We are using a RocksDBStateBackend to store our cases while we build them, so we are already stateful and want to take part in checkpointing.

Given this, once a checkpoint is complete seems like a good place to write to the database: this means we maintain consistency with what Flink has fully processed.

By letting our sink implement CheckpointListener.notifyCheckpointComplete(long checkpointId) we will be notified when a checkpoint is completed. On notification we should upsert all cases completed up to, and including, the given checkpoint.

This means that we also have to know, and keep track of, which checkpoint a pending case belongs to. CheckpointedFunction provides access to a ManagedSnapshotContext which can provide the checkpoint id.

We have to change our invoke method so that we just store the cases until we are ready to write them to the database:

@Override
public void invoke(Case aCase) throws Exception {
  pendingCases.add(aCase);
}

Next we have to figure out which snapshot a case belongs to. We do this by implementing CheckpointedFunction.snapshotState(FunctionSnapshotContext context). We will maintain a map of checkpoint IDs to lists of cases.

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
  long checkpointId = context.getCheckpointId();
  List<Case> cases = pendingCasesPerCheckpoint.get(checkpointId);
  if(cases == null){
    cases = new ArrayList<>();
    pendingCasesPerCheckpoint.put(checkpointId, cases);
  }
  cases.addAll(pendingCases);
  pendingCases.clear();
}

Finally we have to write to the database when a checkpoint completes:

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {

 Iterator<Map.Entry<Long, List<Case>>> pendingCheckpointsIt =
   pendingCasesPerCheckpoint.entrySet().iterator();

 while (pendingCheckpointsIt.hasNext()) {

  Map.Entry<Long, List<Case>> entry = pendingCheckpointsIt.next();
  Long pastCheckpointId = entry.getKey();
  List<Case> pendingCases = entry.getValue();

  if (pastCheckpointId <= checkpointId) {

   for (Case pendingCase : pendingCases) {
    statement.setString(1, pendingCase.getId());
    statement.setString(2, pendingCase.getTraceHash());
    statement.setString(3, pendingCase.getTraceHash());
    statement.addBatch();
   }
   pendingCheckpointsIt.remove();
  }
 }
 statement.executeBatch();

}

We must not forget to enable checkpointing on the ExecutionEnvironment, where the parameter is the time interval between state checkpoints in milliseconds.

ExecutionEnvironment env = ...
env.enableCheckpointing(10000L);

When the job is run we can see that the database is only being written to when a checkpoint completes, i.e. every 10 seconds.

Wrapping up

So now we have our first attempt at a checkpoint aware sink; we have learnt a little bit more about Flink sinks, state and checkpointing.

This approach is almost definitely not production ready. In order to be more confident in our sink we need to understand in more detail the checkpoint mechanism; when checkpoints fail and, in particular, if we should be using the state mechanism to store our pending checkpoints.

We have also not thought about how threadsafe our sink is. In order to understand the requirement, in addition to understand the checkpoint mechanism in more detail, we probably need to also know more about the job scheduling.

We may also want to consider not using PostgreSQL as a sink, instead using queryable state to serve our API. However, we have now got a good starting point to go on with our Flink explorations.

Note: I was using a snapshot version of Flink 1.3, running in IntelliJ.