Total Pageviews

Thursday, May 14, 2015

Azure Mobile Service + Service Fabric = Easy Monitoring

I want to thank all my readers who appreciated my previous blogs on Service Fabric. It is indeed a motivation which drives an tech-enthusiast like me. While driving home, I was thinking of my next blog, when all of a sudden I remembered of a POC which involved Service Fabric Rest API and Azure Mobile Service.

Intent of the POC

The intent of POC was to create a quick efficient way of monitoring Service Fabric cluster without writing tons of code and making things complex. The POC ask was from a client who wanted a feasible solution to monitor his Service Fabric Cluster. The ETA for this was decided to be 7 days, but believe me the first POC was ready by end of second day.

Why Azure Mobile Service?

Azure Mobile Service is an awesome tool for developers who want to write lightweight services, which runs continuously without incurring much effort and cost. I was aware of AMS as I wrote couple of scripts interacting with facebook api and Linkedin in past. (If interested, you can take a look into my netme blog). I thought it will be efficient if the same scripts are used for Service Fabric monitoring. As Service Fabric was exposing REST APIs , it was easy to perform a call from AMS and output the result into Azure Table Storage. A MVC app was created to read the Azure Table storage and show the output in a graphical chart.

Acknowledgement

I want to acknowledge Balaji who mentored me through out this process and made the simple blatant script turn out to be a generic utility.

Let's get our hands dirty

Before starting the problem statement was broken down to major 2 chunks
1. Do Rest API calls to Service Fabric
2. Store the output in table storage

Performing above 2 operations were pretty simple as AMS provide 2 Node.JS packages
a. azure
b. request

If you are new to Node.Js no issues, following script section will help one understand what is the usage for them

Pre-requisite

Ensure that the Http gateway is enabled in Service Fabric Cluster. This will be a section in Cluster Manifest

   <Section Name="HttpGateway">
        <Parameter Name="IsEnabled" Value="true"/>
    </Section>

Also in your ServiceDefinition.csdef under Endpoints, we should have an endpoint for HttpGateway

      <InputEndpoint name="HttpGatewayListenAddress" protocol="http" port="19007" />

In my case the endpoint port is 19007

Do Rest API calls to Service Fabric

Service Fabric provides multiple REST Apis which are documented at https://msdn.microsoft.com/library/azure/dn707692.aspx

For this POC we will discuss few of them

1. In order to start with the POC first we need to create a AMS service



2. Once the mobile service is created one should go to the Scheduler section and create a new job


3. Once the scheduler job is created one can click the AMS scheduler job and go to the Scheduler section, where we will get an area to write our script

4.  Once the job is created one will see an empty function with the job name. Consider this to be the first entry point to the script. As my job name is smurfjob , I got a function name with the same name

function smurfjob() {
      console.log("Starting My App Monitor");
      console.log("This will monitor Node, Application, App Services , Sestem Services, Partition and Replica");
    var dataTypes = [
        "Node"
        , "Application"
        , "AppServices"
        , "SystemServices"
        , "Partition"
        , "Replica"
        ];
    dataTypes.forEach(getData);
}

As mentioned in the above log statements we are going to monitor Service Fabric Node, Application, Application Services, System Services, Partition and Replica

5. Our next module will be saying what is the input and what should be the output. For instance if input is AppServices, it should go to MyAppCurrTable storage table

function getData(dataType, index, array) {
    if (dataType == "Node") {
        getNodesData();
    } else if (dataType == "Application") {
        getApplicationData();
    } else if (dataType == "SystemServices") {
        fetchAndUpdateServicesDataForAppName("System");
    } else if (dataType == "AppServices") {
        getChildDataForParent("MyAppCurrTable", fetchAndUpdateServicesData);
    } else if (dataType == "Partition") {
        getChildDataForParent("MySvcCurrTable", fetchAndUpdatePartitionData);
    } else if (dataType == "Replica") {
        getChildDataForParent("MyPrtnCurrTable", fetchAndUpdateReplicaData);
    }
}

6. Now let's have the modules for different datatypes

Nodes

Let me now take a single module and explain what we want

function getNodesData() {
    //Import the request node.js and azure node.js module
    var req = require('request');
    var azure = require('azure');
 
    // Url for fetching the Node Data 
    var nodeUrlToMonitor = "http://serviceFabricClusterName.cloudapp.net:19007/Nodes?api-version=1.0";
 
    console.log("Url to Monitor" + urlToMonitor);
 
    // fetch Node data
    req.get({
    uri:urlToMonitor,
    timeout : 200000,
    headers:{'content-type': 'application/x-www-form-urlencoded'},
   },
   function (error, response, body) {

   //This is where we fetch the response body
       if (!error && response.statusCode == 200) {
       
           //Parse the json body
           var items = JSON.parse(body);
         
           //Now create the azure storage Table 
           var tableService = azure.createTableService("azure storage account name", "azure storage account key");

           tableService.createTableIfNotExists(TableName, function (error) {
               if (!error) {

                      //Parse the items in json and insert them in Azure Table Storage
                      tableService.insertEntity( parse relevant info)
               }
               else {
                   console.log("CreateTableIfNotExists for " + currentTableName + " returned error: " + error);
               }
            })
       }
       else {
           console.log("ERROR: GET " + urlToMonitor + " returned error: " + error + "and statusCode " + response.statusCode);
       }
   })

}

Hope in the above section we got the actual intent of the idea mentioned earlier. Following is a snapshot of tabular structure of Application service

PartitionKey RowKey ServiceKind Name TypeName ManifestVersion HasPersistedState HealthState ServiceStatus IsServiceGroup AppName
System_ClusterManagerService 5VbGMVNM2EZ7NFYu81gAfO Stateful fabric:/System/ClusterManagerService ClusterManagerServiceType 4.0.99.9490 TRUE Ok 1 FALSE System
System_FailoverManagerService GSc38KxoFFXFThEJbP+vQ0 Stateful fabric:/System/FailoverManagerService FMServiceType 4.0.99.9490 TRUE Ok 1 FALSE System
System_ImageStoreService YcPTmeiqsR0Vh33CpC+7nC Stateful fabric:/System/ImageStoreService FileStoreServiceType 4.0.99.9490 TRUE Ok 1 FALSE System
System_InfrastructureService uEvwax1amAIoal0LvoRL0y Stateful fabric:/System/InfrastructureService InfrastructureServiceType 4.0.99.9490 FALSE Ok 1 FALSE System
System_NamingService x+vT3v9YgNundefinedYvZJPgbTKvo Stateful fabric:/System/NamingService NamingStoreService 4.0.99.9490 TRUE Ok 1 FALSE System
System_RepairManagerService 1bj5zeS2ReeKwbPVSQ1dOE Stateful fabric:/System/RepairManagerService RepairManagerServiceType 4.0.99.9490 TRUE Ok 1 FALSE System

Now one can easily create a MVC app or use PowerBI to read this data , and create a simple dashboard

7. Once all this is done you can save the script and enable it. AMS will act as a background task manager script, and will keep uploading the results as required

Here is the full Azure Mobile Script

function smurfjob() {
      console.log("Starting My App Monitor");
      console.log("This will monitor Node, Application, App Services , Sestem Services, Partition and Replica");
    var dataTypes = [
        "Node"
        , "Application"
        , "AppServices"
        , "SystemServices"
        , "Partition"
        , "Replica"
        ];
    dataTypes.forEach(getData);
}


function getData(dataType, index, array) {
    if (dataType == "Node") {
        getNodesData();
    } else if (dataType == "Application") {
        getApplicationData();
    } else if (dataType == "SystemServices") {
        fetchAndUpdateServicesDataForAppName("System");
    } else if (dataType == "AppServices") {
        getChildDataForParent("MyAppCurrTable", fetchAndUpdateServicesData);
    } else if (dataType == "Partition") {
        getChildDataForParent("MySvcCurrTable", fetchAndUpdatePartitionData);
    } else if (dataType == "Replica") {
        getChildDataForParent("MyPrtnCurrTable", fetchAndUpdateReplicaData);
    }
}

function getNodesData() {
    var req = require('request');
    var azure = require('azure');
    var nodeUrlToMonitor = "http://servicefabricname.cloudapp.net:19007/Nodes?api-version=1.0";
    var nodeHistoryTableName = "NodeHistoryTable";
    var nodeCurrentTableName = "NodeCurrentTable";
    var nodeEventsTableName = "NodeEventsTable";
    var nodePrimaryKeyName = "Name";
    
    fetchAndUpdate("Node", nodeUrlToMonitor, nodePrimaryKeyName, nodeCurrentTableName, nodeEventsTableName, nodeHistoryTableName, 15, null);
}

function getApplicationData() {
    var req = require('request');
    var azure = require('azure');
    var appCurrentTableName = "MyAppCurrTable";
    var appEventsTableName = "MyAppAppEventsTable";
    var appHistoryTableName = "MyAppAppHistoryTable";
    var applicationURL = "http://servicefabricname.cloudapp.net:19007/Applications?api-version=1.0";
    var appPrimaryKeyName = "Id";
    
    fetchAndUpdate("Application", applicationURL, appPrimaryKeyName, appCurrentTableName, appEventsTableName, appHistoryTableName, 15, null);
}

function getChildDataForParent(parentTableName, fetchAndUpdateChildData) {
    var azure = require('azure');
    var tableService = azure.createTableService("storageaccountname", "storageaccountkey");
    var getMatchingEntities = azure.TableQuery
    .select()
    .from(parentTableName);
    tableService.queryEntities(getMatchingEntities, function (error, entities) {
        if (!error) {
            entities.forEach(fetchAndUpdateChildData);
        }
        else {
            console.log("ERROR: queryEntities in " + parentTableName + " returned error " + error);
        }
    });
}

function fetchAndUpdateServicesData(item, index, array) {
    fetchAndUpdateServicesDataForAppName(item.PartitionKey);
}

function fetchAndUpdateServicesDataForAppName(appName) {
    var svcCurrentTableName = "MySvcCurrTable";
    var svcEventsTableName = "MyAppSvcEventsTable";
    var svcHistoryTableName = "MyAppSvcHistoryTable";
    var serviceURL = "http://servicefabricname.cloudapp.net:19007/Applications/" + appName + "/$/GetServices?api-version=1.0";
    var svcPrimaryKeyName = "Id";
    var additionalData = {"AppName": appName};
    console.log("Fetching svc data for app " + appName);
    fetchAndUpdate("Service", serviceURL, svcPrimaryKeyName, svcCurrentTableName, svcEventsTableName, svcHistoryTableName, 15, additionalData);
}

function fetchAndUpdatePartitionData(item, index, array) {
    var svcName = item.PartitionKey.toString();
    var appName = item.AppName;
    fetchAndUpdatePartitionDataForServiceName(appName, svcName);
}

function fetchAndUpdatePartitionDataForServiceName(appName, svcName) {
    var prtnCurrentTableName = "MyPrtnCurrTable";
    var prtnEventsTableName = "MyAppPrtnEventsTable";
    var prtnHistoryTableName = "MyAppPrtnHistoryTable";
    var updatedSvcName = svcName.toString().replace('_', '/');
    console.log("Fetching partition data for app " + appName + " and svc " + updatedSvcName);
    var partitionURL = "http://servicefabricname.cloudapp.net:19007/Applications/" + appName + "/$/GetServices/" + updatedSvcName + "/$/GetPartitions?api-version=1.0";
    var prtnPrimaryKeyName = "PartitionId";
    var additionalData = {"AppName": appName, "ServiceName": updatedSvcName};
    fetchAndUpdate("Partition", partitionURL, prtnPrimaryKeyName, prtnCurrentTableName, prtnEventsTableName, prtnHistoryTableName, 15, additionalData);
}

function fetchAndUpdateReplicaData(item, index, array) {
    var svcName = item.ServiceName.toString().replace("_", "/");
    fetchAndUpdateReplicaDataForPartitionId(item.AppName, svcName, item.PartitionKey, item.ServiceKind);
}

function fetchAndUpdateReplicaDataForPartitionId(appName, svcName, partitionId, serviceKind) {
    var replCurrentTableName = "MyAppReplCurrTable";
    var replEventsTableName = "MyAppReplEventsTable";
    var replHistoryTableName = "MyAppReplHistoryTable";
    var updatedSvcName = svcName.toString().replace('_', '/');
    console.log("Fetching replica data for app " + appName + " and svc " + updatedSvcName + " and prtnId " + partitionId);
    var replicaURL = "http://servicefabricname.cloudapp.net:19007/Applications/" + appName + "/$/GetServices/" + svcName + "/$/GetPartitions/" + partitionId + "/$/GetReplicas?api-version=1.0";
    var replicaPrimaryKeyName = "NodeName";
    /*
    if (serviceKind == "Stateless") {
        replicaPrimaryKeyName = "InstanceId";
    }
    */
    var additionalData = {"AppName": appName, "ServiceName": svcName, "PartitionId": partitionId};
    fetchAndUpdate("Replica", replicaURL, replicaPrimaryKeyName, replCurrentTableName, replEventsTableName, replHistoryTableName, 15, additionalData);
}

function fetchAndUpdate(dataType, urlToMonitor, primaryKeyName, currentTableName, eventsTableName, historyTableName, frequency, additionalData)
{
    var date = new Date(); 
    var current_min = date.getMinutes();
    var fUpdate = current_min % frequency;

    var req = require('request');
    var azure = require('azure');

     console.log("fetchAndUpdate " + urlToMonitor);
    // fetch data
    req.get({
    uri:urlToMonitor,
    timeout : 200000,
    headers:{'content-type': 'application/x-www-form-urlencoded'},
   }, 
   function (error, response, body) {
       if (!error && response.statusCode == 200) {
           var items = JSON.parse(body);
           // console.log(items);
           var tableService = azure.createTableService("storageaccountname", "storageaccountkey");
           tableService.createTableIfNotExists(currentTableName, function (error) {
               if (!error) {
                   // for each row
                   items.forEach(updateTables.bind(null, dataType, primaryKeyName, urlToMonitor, currentTableName, eventsTableName, historyTableName, fUpdate, additionalData));
                   // items.forEach(getHealth.bind(null, dataType, primaryKeyName, urlToMonitor, additionalData));
               }
               else {
                   console.log("CreateTableIfNotExists for " + currentTableName + " returned error: " + error);
               }
            })
       }
       else {
           console.log("ERROR: GET " + urlToMonitor + " returned error: " + error + "and statusCode " + response.statusCode);
       }
   })
}

function getHealth(dataType, primaryKeyName, primaryKeyValueToUse, urlToMonitor, additionalData, item
// , index, array
) {
    // Modify url to add data and GetHealth query
    // console.log("getHealth " + dataType + " item " + item[primaryKeyName]);
    var primaryKeyValue = item[primaryKeyName];
    if (dataType == "Partition") {
        primaryKeyValue = primaryKeyValueToUse;
    }
    
    /*
    var primaryKeyValue = item[primaryKeyName];
    if (dataType == "Partition") {
        var partitionInformation = item["PartitionInformation"];
        primaryKeyValue = partitionInformation["Id"];
    }
    */
    var healthUrl = urlToMonitor.replace("?api-version=1.0", "/" + primaryKeyValue +"/$/GetHealth?api-version=1.0");
    var req = require('request');

    req.get({
    uri:healthUrl,
    timeout : 200000,
    headers:{'content-type': 'application/x-www-form-urlencoded'},
   }, 
   function (error, response, body) {
       if (!error && response.statusCode == 200) {
           var items = JSON.parse(body);
           // console.log(items);
            var entityName = primaryKeyValue;
            if (dataType == "Service") {
                entityName = primaryKeyValue.toString().replace('/', '_');
            }
            else if (dataType == "Replica") {
                entityName = additionalData["PartitionId"] + "_" + primaryKeyValue;
            }
           items["HealthEvents"].forEach(updateHealthEventsTable.bind(null, dataType, entityName));
       }
       else {
           console.log("ERROR: GET health for " + healthUrl + " returned error: " + error + "and statusCode " + response.statusCode);
       }
   })
}

function updateHealthEventsTable(dataType, entityName, item, index, array) {
    var healthEventsTable = "MyAppHealthEventsTable";
    var azure = require('azure');
    var tableService = azure.createTableService("storageaccountname", "storageaccountkey");
    tableService.createTableIfNotExists(healthEventsTable, function (error) {
       if (!error) {
           item["PartitionKey"] = entityName;
           item["RowKey"] = item["SequenceNumber"];
           item["DataType"] = dataType;
           item["HealthState"] = mapEnum("HealthState", item["HealthState"]);
           tableService.insertOrReplaceEntity(healthEventsTable, item, function(error) {
                    if (!error) {
                        console.log(item.PartitionKey + " Health event updated");
                    }
                    else {
                        console.log("ERROR while updating health event for " + item.PartitionKey + " : " + error);
                    }
           }) 
       }
       else {
           console.log("ERROR: CreateTableIfNotExists for " + healthEventsTable + " returned error: " + error);
       }
    })
}

function updateTables(dataType, primaryKeyName, urlToMonitor, currentTableName, eventsTableName, historyTableName, fUpdate, additionalData, item, index, array) {
    var azure = require('azure');
    var primaryKeyValue = item[primaryKeyName];
    if (dataType == "Service") {
        primaryKeyValue = item[primaryKeyName].toString().replace('/', '_');
    }
    else if (dataType == "Partition") {
        var partitionInformation = item["PartitionInformation"];
        primaryKeyValue = partitionInformation["Id"];
        item["PartitionInformation"] = null;
    }
    else if (dataType == "Replica") {
        primaryKeyValue = additionalData["PartitionId"] + "_" + item[primaryKeyName];
    }
    
    for (var prop in additionalData) {
        item[prop] = additionalData[prop];
    }
    
    var getMatchingEntities = azure.TableQuery
    .select()
    .from(currentTableName).where('PartitionKey eq ?', primaryKeyValue);
    var tableService = azure.createTableService("storageaccountname", "storageaccountkey");
    tableService.queryEntities(getMatchingEntities, function (error, entities) {
        if (!error) {
            if (entities.length > 0) {
                for (var prop in item) {
                    // console.log("item[" + prop + "] = " + item[prop] + " ; eProp " + entities[0][prop]);
                    if (prop != "id" && prop != "Id" && prop != "link" && prop != "updated" && prop != "etag" && prop != "PartitionKey" && prop != "RowKey" && prop != "Timestamp") {
                        // if different from currenttable, then update events table
                        if (dataType == "Application" && prop == "Parameters") {
                            item[prop] = JSON.stringify(item[prop]);
                        }
                        item[prop] = mapEnum(prop, item[prop]);
                        var iProp = (item[prop] == null) ? null : item[prop].toString();
                        var eProp = (entities[0][prop] == null) ? null : entities[0][prop].toString();
                        if (iProp != eProp) {
                            writeToEventsTable(eventsTableName, primaryKeyValue, prop, item[prop]);
                            entities[0][prop] = item[prop];
                        }
                    }
                }
                // update row of existing entity
                item["PartitionKey"] = primaryKeyValue;
                item["RowKey"] = entities[0].RowKey;
                tableService.updateEntity(currentTableName, item, function (error) {
                    if (!error) {
                        // console.log(item.PartitionKey + " Entity updated");
                    }
                    else {
                        console.log("ERROR while updating entity " + item.PartitionKey + " : " + error);
                    }
                });
            }
            else {
               item["PartitionKey"] = primaryKeyValue;
               item["RowKey"] = randomString(128);
               for (var prop in item) {
                    item[prop] = mapEnum(prop, item[prop]);
                    if (dataType == "Application" && prop == "Parameters") {
                        var jsonValue = JSON.stringify(item[prop]);
                        console.log("App Parameters = " + jsonValue);
                        item[prop] = jsonValue;
                    }
               }
               tableService.insertEntity(currentTableName, item, function (error) {
                   if (!error) {
                       console.log(item.PartitionKey + " " + dataType + " Entity inserted in current table");
                   }
                   else {
                       console.log("Error while inserting entity " + item.PartitionKey + " in current table : " + error);
                   }
               });
            }
            // Update history table if frequency is met
            if (fUpdate == 0) {
               tableService.createTableIfNotExists(historyTableName, function (error) {
                   if (!error) {
                       item["RowKey"] = randomString(128);
                       tableService.insertEntity(historyTableName, item, function (error) {
                           if (!error) {
                               // console.log(item.PartitionKey + " Entity inserted in history table");
                           }
                           else {
                               console.log("ERROR while inserting entity " + item.PartitionKey + " in history table : " + error);
                           }
                       });
                   }
               });
            }
        }
        else {
            console.log("queryEntities returned error: " + error);
        }
    });
    getHealth(dataType, primaryKeyName, primaryKeyValue, urlToMonitor, additionalData, item);
}

function mapEnum(prop, value) {
    var retVal = value;
    var NodeStatus = {
        "0": "Invalid"
        , "1": "Up"
        , "2": "Down"
        , "3": "Enabling"
        , "4": "Disabling"
        , "5": "Disabled"
    }
    var HealthState = {
        "0": "Invalid"
        , "1": "Ok"
        , "2": "Warning"
        , "3": "Error"
        , "65535": "Unknown"
    }
    var ReplicaRole = {
        "0": "Invalid"
        , "1": "None"
        , "2": "Primary"
        , "3": "IdleSecondary"
        , "4": "ActiveSecondary"
    }
    var ReplicaStatus = {
        "0": "Invalid"
        , "1": "InBuild"
        , "2": "Standby"
        , "3": "Ready"
        , "4": "Down"
        , "5": "Dropped"
    }
    var ServiceKind = {
        "0": "Invalid"
        , "1": "Stateless"
        , "2": "Stateful"
    }
    switch (prop) {
        case "NodeStatus":
            retVal = NodeStatus[value];
            break;
        case "HealthState":
            retVal = HealthState[value];
            break;
        case "ReplicaRole":
            retVal = ReplicaRole[value];
            break;
        case "ReplicaStatus":
            retVal = ReplicaStatus[value];
            break;
        case "ServiceKind":
            retVal = ServiceKind[value];
            break;
        default:
            retVal = value;
            break;
    }
    return retVal;
}

function writeToEventsTable(eventsTableName, partitionKey, propertyName, propertyValue)
{
    var azure = require('azure');
    var tableService = azure.createTableService("storageaccountname", "storageaccountkey");
    tableService.createTableIfNotExists(eventsTableName, function (error) {
        if (!error) {
            var rowKey = randomString(128);
            var task = {
                PartitionKey: partitionKey
                            , RowKey: rowKey
                            , NotifyStateName: propertyName
                            , NotifyStateValue: propertyValue
            }
            tableService.insertEntity(eventsTableName, task, function (error) {
                if (!error) {
                    // console.log('Event table updated for ' + partitionKey + ' property ' + propertyName);
                }
                else {
                    console.log('ERROR while Event Table update: ' + error + ";pk = " + partitionKey + "; rk = " + rowKey + "; pname = " + propertyName + "; pval = " + propertyValue);
                }
            });
        }
    });
}

function randomString(bits){var chars,rand,i,ret
          chars='ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+'
          ret=''
          // in v8, Math.random() yields 32 pseudo-random bits (in spidermonkey it gives 53)
          while(bits > 0){
              rand=Math.floor(Math.random()*0x100000000) // 32-bit integer
            // base 64 means 6 bits per character, so we use the top 30 bits from rand to give 30/6=5 characters.
            for(i=26; i>0 && bits>0; i-=6, bits-=6) ret+=chars[0x3F & rand >>> i]}
              return ret
}



Hope this blog will give readers confidence of creating a service fabric monitor with less cost and effort. Any suggestion or views are always welcome.

Thursday, May 7, 2015

ServiceFabric + ASA = Crime Detection, Part2

This blog is a continuation of http://nabaruns.blogspot.com/2015/05/servicefabric-asa-crime-detection-smurf.html

In this blog we want to highlight the PowerMap Analysis and the Database schema which we already discussed.

Thanks to one of my colleague, who quickly got the PowerMap skills, I am keeping this short by showing the analytics fetched by him.





The data is getting simulated while I am writing this blog. It is fetching the results from Database as follows

The latitude and longitude data are helping us to point it in PowerMap, thus giving a real time tracking analysis of the stolen car, hence proving the POC.

 

ServiceFabric + ASA = Crime Detection, Smurf Part2

This blog captures a story of an idea which was in my mind from a long time, but did not prove to be feasible in past. With Service Fabric and Azure Stream Analytics this has now become possible for us to perceive the potential of technology to prevent crime.

Scenario

Here in this story we have tried to emulate a scenario wherein there are multiple traffic cameras capturing the vehicle number plates and sending it to ASA(Azure Stream Analytics) service. Suppose there is a vehicle that got stolen and has been reported just now. Law agencies need to add this vehicle in lookout table. Now the stolen vehicle happens to pass from Camera3. Camera3 which is continuously emitting data to Event Hub which in turn goes to ASA, captures the number plate data. ASA queries the input against the lookout table and filters the vehicle data as soon as it is scanned by the ASA query into the database. PowerMap sheet provides a analysis based on the database content and shows a pointer at Camera3 location, thus providing the law agencies a quick mechanism to track the vehicle

Problem Breakdown 

Broadly for our POC we need to see these 2 paths
1. Simulate the traffic cam scenario which is sending continuous data to event hubs
2. Input is read by ASA and stolen vehicle details with the Camera details are sent to output DB
 

Technical details

Input

In order to have input. We created a Stateless Service Fabric Actor project. As Service Fabric Actor has a compute and a state , this is feasible to simulate a traffic camera.
 
Broadly we are looking for following data structures. StolenCarModel is a datastructure that is holding the Car details such as SerialId (Any identification, we are considering this as unique id, and this will help us to create the clustered index in our db), Vehicle Number(Captured from image), CameraSerial(Identification Id of Camera), CaptureTime(Timestamp when it was captured, which will help in tracking investigation)
 
 public class StolenCarModel
    {
        public int SerialId { get; set; }
        public string VehicleNumber { get; set; }
        public string CameraSerial { get; set; }
        public string CaptureTime { get; set; }
    }
 
Another datastructure that needs to be maintained is CameraDetails , which will have the CameraID, Latitude and Longitude where Camera is positioned (This will help us to point in powermap), and Area which will have the name of the area
 
    public class CameraDetails
    {
        public string CamID { get; set; }
        public string Latitude { get; set; }
        public string Longitude { get; set; }
        public string Area { get; set; }
    }
 

Service Fabric Details 

First and foremost is the interface Actor for our service
 
public interface IStolenCarStatelessActor : IActor
    {
        Task<bool> StartSendingCarDetails(List<StolenCarModel> stolenCarList);
    }
 
Corresponding to the interface the Actor Class should have the implementation
 
public async Task<bool> StartSendingCarDetails(List<StolenCarModel> cameraDetails)
        {
            ActorEventSource.Current.ActorMessage(this, "Start Sending Data");
            EventHubLayer hub = new EventHubLayer();
            var sendDetails = await hub.SendEventsToEventHub(cameraDetails);
            return sendDetails;
        }
 

Smurf Details 

We are using Smurf core libraries in order to send data to event Hubs. Please refer to my previous blog on Smurf Core api at http://nabaruns.blogspot.com/2015/03/smurfan-asa-based-tool-in-making-part-1.html
 
In order to populate the StolenCarDetails class, we are using this function
 
/// <summary>
        /// Get List of sample StolenCarModel details which can be sent later to the event hub
        /// </summary>
        /// <returns>Returns the list of stolen car model</returns>

        private static List<StolenCarModel> GetMetricDataList(string camera)
        {
            Random rand = new Random();
            var metricDataList = new List<StolenCarModel>();
 
            Console.WriteLine();
            Console.WriteLine("#####################{0}####################", camera);
            for (var loop = 0; loop < 5; loop++)
            {
                var date = DateTime.Now;
                var serialId = rand.Next(0, 50);
                var nowTime = date.Ticks.ToString();
               
                Console.Write("Camera:{0} SerialId:{1} TimeStamp:{2} Vehicle:{3}", camera, serialId, nowTime, "ADC01" + serialId);
                Console.WriteLine();
                var info = new StolenCarModel() { CameraSerial = camera, SerialId = serialId, CaptureTime = nowTime, VehicleNumber = "ADC01" + serialId };
                metricDataList.Add(info);
            }
            Console.WriteLine("##############################################");
            Console.WriteLine();
            return metricDataList;
        }
 
In our POC at time t=0, we are assuming there are 5 events getting generated by each camera. We are currently simulating the camera details as mentioned in the marked line
 
This is giving us the following input
 
 
 
Now moving ahead we need to simulate multiple actors as cameras.
 
var camera1 = ActorProxy.Create<IStolenCarStatelessActor>(ActorId.NewId(), "fabric:/StolenCarStatelessActorApplication");
            var camera2 = ActorProxy.Create<IStolenCarStatelessActor>(ActorId.NewId(), "fabric:/StolenCarStatelessActorApplication");
         ...
            var camera10 = ActorProxy.Create<IStolenCarStatelessActor>(ActorId.NewId(), "fabric:/StolenCarStatelessActorApplication");
            int loop = 0;
            do
            {
                var tollboothList = GetMetricDataList("Camera1");
                var sendSuccess = camera1.StartSendingCarDetails(tollboothList);
                Console.WriteLine("Sending Camera1 Details {0}: {1}", camera1.GetActorId(), sendSuccess.Result);
                Thread.Sleep(5000);
                tollboothList = GetMetricDataList("Camera2");
                sendSuccess = camera2.StartSendingCarDetails(tollboothList);
                Console.WriteLine("Sending Camera2 details {0}: {1}", camera2.GetActorId(), sendSuccess.Result);
                Thread.Sleep(5000);
                ...
                tollboothList = GetMetricDataList("Camera10");
                sendSuccess = camera10.StartSendingCarDetails(tollboothList);
                Console.WriteLine("Sending Camera10 details {0}: {1}", camera10.GetActorId(), sendSuccess.Result);
                Thread.Sleep(5000);
                loop++;
            } while (loop < 10);
 
In above code we are creating 10 actor objects calling the Stolen Car service and then sending random data under them
 
Now having done the input part next is ASA query which will help us populate the data to db
 

ASA Query

In order to have a POC for the idea, our ASA query is very simple
 
SELECT SerialId, VehicleNumber, CameraSerial, CaptureTime FROM CallStream WHERE VehicleNumber='ADC0116';
 
In here we are saying that the stolen vehicle is ADC0116, if I get any data related to that , populate the data in my database.
 

SQL Database

In order to create a database which will keep a track of the stolen vehicles we need to create a table similar to the StolenCar Schema
 
Keeping it very simple I create the table as follows
 
Create table StolenCardTable (SerialId varchar(255), VehicleNumber varchar(255), CameraSerial varchar(255), CaptureTime varchar(255));
 
CREATE CLUSTERED INDEX StolenCardTable
    ON StolenCardTable(SerialId ASC);
GO 

Action Time

Till now I have all pieces in place to verify my app. Here is a screenshot which verifies the stateless service actor app deployed on my localhost cluster
 
 
Now after starting the client first point of verification is event hub dashboard and logs which should start receiving the events
 
 
Second verification will be ASA dashboard and logs
 
 
If till now you have seen everything is fine most probably you will get the data in database
 
16 ADC0116 Camera7 NULL
16 ADC0116 Camera9 NULL
16 ADC0116 Camera7 NULL
16 ADC0116 Camera10 NULL
16 ADC0116 Camera3 NULL
16 ADC0116 Camera2 NULL
16 ADC0116 Camera2 NULL
16 ADC0116 Camera9 NULL
16 ADC0116 Camera2 NULL
I have got this sample output in my db which says ADC0116 was first seen in Camera7 then 9 then again 7 and so on , eventually ending in 2. This can be now easily highlighted in PowerMap based on camera details and location which will be my next blog.
 
I think by now I am having a story to share with you all. The intent of this blog is to utilize the potent technologies and try to make this world a better place. Hope you will like it. Please feel free to comment your views or suggestions.

Continuation blog is at http://nabaruns.blogspot.com/2015/05/servicefabric-asa-crime-detection-part2.html

Tuesday, May 5, 2015

Why Service Fabric?

This is a brief article which is not related to What is  Service Fabric. The intent is to add more pointers to this as we explore Service Fabric.

 Here in this article the focus is to study Why Service Fabric is needed. Following is a comparison study that I manage to collect talking to some Service Fabric devs and PMs. Having said that the intent of this blog is to open debate points.

  1. Why we want to use Service Fabric as Hadoop Map Reduce
Suppose we have 1 TB of data size, and the data is divided in many sections. Considering one section , if there is a failover during the execution. The processing start again from the beginning of the section in case of Hadoop Map Reduce. If we implement map reduce in Service Fabric, as the states are saved this will not be an issue.

Traditional Map Reduce is having mapper, combiner and reducer. Data gets copied from mapper to combiner and then Reducer. In case of Service Fabric we save the data from getting copied multiple times (Copy happens from Mapper directly to Reducer) 

  1. Distributed Data Structures
Service Fabric is providing Distributed data structures like Distributed Queue and Distributed Dictionary. These in built data structure helps one to quickly develop stateful services on windows fabric. This also ensures that your service never loses data inspite of cluster wide failures.

Unlike Azure Redis cache , Azure Table Store, Azure Queue the state is kept locally in service instance. Advantages are all reads are local, all  write incur minimum number of network.



Saturday, May 2, 2015

Service Fabric Actor app ready in just 4 hours

This is a continuation of the previous blog where I played with Statefuc KVS service. This time I thought to play around with Service Fabric actor services.

8:00 pm : After fooling around with friends for quiet some time, :) I finally sat down on my couch with my laptop, thinking of what new I can learn. As it was Friday I was not bothered by any official mails, so I went to http://azure.microsoft.com. After recent build sessions the document section was filled with all the exciting things Microsoft was doing. I thought to continue the story of Service Fabric http://azure.microsoft.com/en-us/documentation/services/service-fabric/ and get some hands-on experience with Service Fabric Actor Model.

8:30 pm: After going through initial documentation, I cannot hold my nerves and created a new Stateful Fabric Actor MicroService. Please Note: you need to download the 2015 RC VSTS in order to avail Service Fabric. After goin throug initial Fabric Actor introduction I started geting my hands dirty.

9:00pm - 12:00 am: During this duration I went through many references from Reliable Actor documentation. My scenario was to get Facebook events corresponding to a particular keyword. For ex: If I search for microsoft , it should tell me all the facebook events for Microsoft in recent past as well as upcoming

9:00pm - 10:00pm: Following the basic steps I went through the doc published at http://azure.microsoft.com/en-us/documentation/articles/service-fabric-reliable-actors-get-started/

This document gives an idea how the fabact registers itself to the cluster. From a user point of view this document talks about the entry point to the user logic.

Following code snippet is from the Service Project which is in my case SocialAnalyzer.cs

We are using the stateful Actor for our experiment

 public class SocialAnalyzer : Actor<SocialAnalyzerState>, ISocialAnalyzer
{
}

Here the state is mantained in SocialAnalyzerState class.

 public Task<List<FacebookMessage>> GetFacebookMessages(List<string> searchTerms)
        {
            ActorEventSource.Current.ActorMessage(this, "Getting Facebook Messages for {0}", searchTerms);
            FacebookCrawler crawler = new FacebookCrawler();
            if(this.State.SearchTerms == null)
            {
                this.State.SearchTerms = new List<string>();
            }
            
            foreach(var term in searchTerms)
            {
                if(!this.State.SearchTerms.Contains(term))
                {
                    this.State.SearchTerms.AddRange(searchTerms);
                    this.State.FbMessages.AddRange(crawler.GetMessages(term));
                }                
            }
            
            return Task.FromResult(this.State.FbMessages);

        }

Why I am using a Stateful Actor is shown in here. I don't want to search for the same query if it is already searched by someone. For ex: if Person A searches for "Microsoft" the response should get stored in Actor's State. Now if Person B searches for Microsoft, without calling the facebook graph API we can just fetch the results for Actor's state.

Above code snippet helps me fetch FB message response and store it in Actor State.

Here is the SocialAnalyzerState definition. As mentioned in the doc

[DataContract]
    public class SocialAnalyzerState
    {
        public SocialAnalyzerState()
        {
            this.FbMessages = new List<FacebookMessage>();
        }

        [DataMember]
        public int Count;

        [DataMember]
        public List<FacebookMessage> FbMessages { get; set; }

        [DataMember]
        public List<string> SearchTerms { get; set; }

        public override string ToString()
        {
            return string.Format(CultureInfo.InvariantCulture, "SocialAnalyzerState[Count = {0}]", Count);
        }
    }

Beleive me this is the only thing I had to do from Service Fabric communication point of view. Else remaining was my own logic.

10:00 pm - 11:00 pm: Till now I was enjoying the script kiddie experience. My intent was how quickly I can create an app on service fabric actor model and try to fulfil one of my need. As most of the code was already done for me, I didnot need to go through the docs line by line, instead see something in action. So without much ado I started writing my client which will help me to test and debug.

In my client I wanted to show the name of the event, Location and Date of event

 public static void Main(string[] args)
        {
            var searchQuery = new List<string>();
            var proxy = ActorProxy.Create<ISocialAnalyzer>(ActorId.NewId(), "fabric:/SocialAnalyzerApplication");

            if(args != null)
            {
                foreach(var item in args)
                {
                    searchQuery.Add(item);
                }                
            }
            else
            {
                searchQuery.Add("Microsoft");
            }

            var facebookMessages = proxy.GetFacebookMessages(searchQuery).Result;
            foreach(var message in facebookMessages)
            {
                Console.WriteLine("Name: " + message.Name);
                Console.WriteLine("Location: " + message.Location);
                Console.WriteLine("Date: " + message.Start_Time);
                Console.WriteLine("");
            }            
        }        

My intent was that user will keep adding arguments to the exe call to fetch the corresponding results.

11:00pm-12:00am:  Now my final piece of cake, which always excites me is social networking apis. I figured out that the facebook graph api knowledge I had was now obsolete, that proves that I didnot update myself from long time. The current graph api was 2.3 and they optimized some of the query syntax in this version.

In order to store the Facebook Messages in my state I had to take help of a class FacebookMessage


    public  class FacebookMessage
    {
        public string Name { get; set; }

        public string Start_Time { get; set; }

        public string Location { get; set; }

        public string ID { get; set; }

    }

Above schema fulfils all my requirements which I need to fetch the details of an event. Creating and app in facebook and getting accesstoken in graphapi has already been discussed in my previous blog. After getting an access token, search api can be performed by the following snippet. This helped in getting the list of FaceBook Messages related to events. We can perform User, Place and other types of searching too with few modification in the graph api call.

 public List<FacebookMessage> GetMessages(string query)
        {
            var facebookMessages = new List<FacebookMessage>();

            //TODO remove this to configuration xml
            var accessToken = "<<accesstoken>>";

            string getFacebookMessage = "https://graph.facebook.com/v2.3/search?q='" + query + "'&type=event&access_token=" + accessToken;

            using (HttpClient facebookClient = new HttpClient())
            using (HttpResponseMessage facebookResponse = facebookClient.GetAsync(getFacebookMessage).Result)
            {
                facebookResponse.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
                facebookResponse.EnsureSuccessStatusCode();
                string responseBody = facebookResponse.Content.ReadAsStringAsync().Result;

                
                //Add the value to key value store
                JObject json = JObject.Parse(responseBody);
                JArray array = (JArray)json["data"];
                
                foreach (var item in array)
                {
                    if (item["id"] != null)
                    {
                        var fbMsg = new FacebookMessage();
                        fbMsg.ID = item["id"].ToString();
                        fbMsg.Name = item["name"].ToString();

                        fbMsg.Start_Time = item["start_time"] != null ?item["start_time"].ToString() : string.Empty;
                        fbMsg.Location = item["location"] != null ? item["location"].ToString() : string.Empty;

                        facebookMessages.Add(fbMsg);
                    }
                }

                return facebookMessages;
            }

        }

At the stroke of midnight, I celebrated my 4 hour journey with these output



This might be a simple app, but it indeed gives a maker the confidence of creating a highly scalable distributed stateful app, which was not so simple few days ago.

I wonder what other stories can we unveil with this framework. Will keep exploring and will get back with some more experience next time.