Flink is a data processing tool which is now widely used for stream processing. It has various streaming sources like Kafka, HDFS, Socket, etc. But how to do streaming with MongoDB??

Here is the answer.

Below is the architecture of mongoDB real time analytics with flink

 MongodbStreamingWithFlink
The key here is ‘_id’ field using which real time processing is acheived.
The index ‘_id’ is present in ascending order of each document. So the index ‘_id’ can be used to track where it was last reading

Example :

package com.test.mongoDB.streaming;


import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.bson.Document;

import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;

public class MongoSource extends RichSourceFunction {

private String inputClientURI;
private String mongoDatabase;
private String mongoCollection;
private String checkPointFile;
private String documentId;
private boolean isRunning;

public MongoSource(Properties property) {

this.inputClientURI = property.getProperty(“mongo.input.uri”);
this.mongoDatabase = property.getProperty(“inputDatabase”);
this.mongoCollection = property.getProperty(“collection”);
this.checkPointFile = property.getProperty(“checkPointFile”);
this.documentId = “”;
this.isRunning = true;

}

@Override
public void cancel() {

try {
FileWriter outputWriter = new FileWriter(new File(this.checkPointFile));
outputWriter.write(this.mongoCollection + “:” + documentId);

this.isRunning = false;
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run(SourceContext context) throws Exception {

//Getting connection to transaction database
List seeds = new ArrayList();
String[] hosts = this.inputClientURI.split(“,”);

for(int i=0; i < hosts.length; i++) {
seeds.add(new ServerAddress(hosts[i]));
}

MongoCollection coll = new MongoClient(seeds).getDatabase(this.mongoDatabase)
.getCollection(this.mongoCollection);

BufferedReader inputReader = new BufferedReader(new FileReader(this.checkPointFile));
if ((this.documentId = inputReader.readLine()) != null) {
this.documentId = this.documentId.split(“:”)[1];
}

Document doc = new Document();
MongoCursor cursor;
while(this.isRunning) {

Document query = new Document();

if(!this.documentId.equals(“”)) {
query.put(“_id”, new Document(“$gt”, this.documentId));
}

cursor = coll.find(query).iterator();
while(cursor.hasNext()) {
doc = cursor.next();
context.collect(doc);
}

if(doc.containsKey(“_id”)){
this.documentId = doc.get(“_id”).toString();
}
}
}
}

Note: Checkpoint File stores the state information that flink uses to track where it was last reading


Leave a Reply

%d bloggers like this: