ciava.at

WebSocket with StreamLayer

Blog Post created by ciava.at on Aug 5, 2014

The web applications' evolution has brought about a rising request for real-time communications: for instance chat applications, real-time updatings, online games and so on.

So far, polling has been the easiest method: in given times, the user asks and the server responds. The developer via script sends the request and verifies if there are the wanted information thanks to the server's response.
The latency may not be acceptable; the request can be kept longer maintaining the connection with the server for a longer time (long polling). Yet these methods and other ones produce negative aspects such as inefficiency and complexity.
The WebSocket protocol solves the above mentioned problems because a persistent, bidirectional, full-duplex TCP connection is kept and it is warranted by a handshaking client-key system and a model based on origin; furthermore the transmission is masked in order to avoid attacks. For further details see The WebSocket protocol and WebSocket API.

The framework 4.5 NET provides a managed implementation of WebSocket  protocol, while modern browsers such as Chrome, Firefox, Safari, Opera and IE10 support the WebSocket specification.
Natively it is supported by Windows Server 2012 and Windows 8 with IIS8 and IIS8 Express. With other Windows versions you could use SignalR and Socket.IO which offer the great advantage to support fallback strategies (for example browser clients which do not support WebSocket).

 

In order to install WebSocket on Windows Server 2012:
- Open Server Manager;
- click on Add Roles and Features;
- select Role-based or Feature-based Installation and then click on Next;
- select the server (the local server is selected as default) and then click on Next;
- expand Web Server (IIS) in Roles, then expand Web Server and then Application Development;
- select WebSocket Protocol and then on Next;
- if no additional functionality is needed click on Next;
- click on Install;
- when the setting is completed close the wizard.

 

 

 

 

 

 

Now IIS is enabled to manage WebSocket.
Let's create a generic asynchronous handler HTTP (available with .NET 4.5) and derive a specialized class from it. Let's register the new HTTP handler in web.config of the application and create a javascript test page using API Esri Javascript. In this case we can use StreamLayer class which allows us to visualize feature in real time not only from GEP but also from WebSocket providing  feature in Esri JSON format.

 

namespace StreamLayerDemo
{
    using System;
    using System.Diagnostics;
    using System.Diagnostics.CodeAnalysis;
    using System.Net.WebSockets;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Web;
    using System.Web.WebSockets;
 
    [SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1600:ElementsMustBeDocumented", Justification = "Demo code")]
    public abstract class WebSocketAsyncHandler : HttpTaskAsyncHandler    {
        /// <summary>        /// Gets a value indicating whether this handler can be reused for another request.        /// Should return false in case your Managed Handler cannot be reused for another request, or true otherwise.        /// Usually this would be false in case you have some state information preserved per request.        /// You will need to configure this handler in the Web.config file of your         /// web and register it with IIS before being able to use it. For more information        /// see the following link: <see cref="http://go.microsoft.com/?linkid=8101007" />        /// </summary>        public override bool IsReusable
        {
            get            {
                return false;
            }
        }
 
        private WebSocket Socket { get; set; }
 
        public override async Task ProcessRequestAsync(HttpContext httpContext)
        {
            await Task.Run(() =>
            {
                if (httpContext.IsWebSocketRequest)
                {
                    httpContext.AcceptWebSocketRequest(async delegate(AspNetWebSocketContext context)
                    {
                        this.Socket = context.WebSocket;
 
                        while (this.Socket != null || this.Socket.State != WebSocketState.Closed)
                        {
                            try                            {
                                switch (this.Socket.State)
                                {
                                    case WebSocketState.Connecting:
                                        this.OnConnecting();
                                        break;
                                    case WebSocketState.Open:
                                        this.OnOpen();
                                        break;
                                    case WebSocketState.CloseSent:
                                        this.OnClosing(false, string.Empty);
                                        break;
                                    case WebSocketState.CloseReceived:
                                        this.OnClosing(true, string.Empty);
                                        break;
                                }
                            
                            
                                ArraySegment<byte> buffer = new ArraySegment<byte>(new byte[1024]);
                                WebSocketReceiveResult receiveResult = await this.Socket.ReceiveAsync(buffer, CancellationToken.None);
 
 
                                switch (receiveResult.MessageType)
                                {
                                    case WebSocketMessageType.Text:
                                        string message = Encoding.UTF8.GetString(buffer.Array, 0, receiveResult.Count);
                                        this.OnMessageReceived(message);
                                        break;
                                    case WebSocketMessageType.Binary:
                                        this.OnMessageReceived(buffer.Array);
                                        break;
                                    case WebSocketMessageType.Close:
                                        this.OnClosing(true, receiveResult.CloseStatusDescription);
                                        break;
                                }
                            }
                            catch (Exception ex)
                            {
                                this.OnError(ex);
                            }
                        }
                    });
                }
            });
        }
 
        protected virtual void OnConnecting()
        {
        }
 
        protected virtual void OnOpen()
        {
        }
 
        protected virtual void OnMessageReceived(string message)
        {
        }
 
        protected virtual void OnMessageReceived(byte[] bytes)
        {
        }
 
        protected virtual void OnClosing(bool isClientRequest, string message)
        {
        }
 
        protected virtual void OnClosed()
        {
        }
 
        protected virtual void OnError(Exception ex)
        {
        }
 
        [DebuggerStepThrough]
        protected async Task SendMessageAsync(byte[] message)
        {
            await this.SendMessageAsync(message, WebSocketMessageType.Binary);
        }
 
        [DebuggerStepThrough]
        protected async Task SendMessageAsync(string message)
        {
            await this.SendMessageAsync(Encoding.UTF8.GetBytes(message), WebSocketMessageType.Text);
        }
 
        private async Task SendMessageAsync(byte[] message, WebSocketMessageType messageType)
        {
            await this.Socket.SendAsync(
                new ArraySegment<byte>(message),
                messageType,
                true,
                CancellationToken.None);
        }
    }
}

Let’s derivate from the HttpTaskAsyncHandler abstract class and do the override of IsReusable property and  ProcessRequest  method, then we will use  async/await and the task class because we process asynchronous tasks.
In this class we will just store the WebSocket instance with a property, verify if the http request is a WebSocket request (IsWebSocketRequest) and, basing on WebSocket state, recall the correspondent virtual method. Besides let's implement the code to send messages to the client. WebSocket allows you to send and receive messages (text and binary) in asynchronous modality. This abstract class allows you to implement your handler WebSocket specialized class.

Virtual methods implementations will be in the derived class:

 

 

 

namespace StreamLayerDemo
{
    using System;
    using System.Diagnostics.CodeAnalysis;
    using System.Threading.Tasks;
 
    [SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1600:ElementsMustBeDocumented", Justification = "Demo code")]
    public class StreamLayerWebSocketAsyncHandler : WebSocketAsyncHandler    {
        protected override void OnOpen()
        {
            PointTicker.DefaultInstance.Update += this.PointTicker_Update;
            base.OnOpen();
        }
 
        protected override void OnClosing(bool isClientRequest, string message)
        {
            PointTicker.DefaultInstance.Update -= this.PointTicker_Update;
            base.OnClosing(isClientRequest, message);
        }
 
        protected override void OnMessageReceived(string message)
        {
            // Assignment prevents warning "Because this call is not awaited...Consider applying the 'await' operator            // This is intentional => fire and forget            
            //Task task = this.SendMessageAsync("Your message is: " + message);            
        }
 
        protected override void OnError(Exception ex)
        {
            // Assignment prevents warning "Because this call is not awaited...Consider applying the 'await' operator            // This is intentional => fire and forget            var task = this.SendMessageAsync(string.Format("Something exceptional happened: {0}", ex.Message));
        }
 
        private void PointTicker_Update(object sender, PointTickerEventArgs e)
        {
            // Assignment prevents warning "Because this call is not awaited...Consider applying the 'await' operator            // This is intentional => fire and forget            var task = this.SendMessageAsync(e.Feature);
 
        }
    }
}

 

 

To simulate the data sending (in this particular example random Points in a certain extent) we use a single instance of a class implementing a simple Timer which every five seconds sends a Point feature to the client. Start and Stop methods of the class are called in Start and Stop global events (global.asax.cs) of the application.

 

 

namespace StreamLayerDemo
{
    using System;
    using System.Diagnostics.CodeAnalysis;
 
    [SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1600:ElementsMustBeDocumented", Justification = "Demo code")]
    public class Global : System.Web.HttpApplication    {
        protected void Application_Start(object sender, EventArgs e)
        {
            // var hostFactory = new PointTickerHostFactory();            // var route = new ServiceRoute("PointTicker", hostFactory, typeof(PointTickerService));            // System.Web.Routing.RouteTable.Routes.Add(route);            PointTicker.DefaultInstance.Start();
        }
 
        protected void Application_End(object sender, EventArgs e)
        {
            PointTicker.DefaultInstance.Stop();
        }
    }
}

 

 

Class to simulate the date sending to the client:

 

 

namespace StreamLayerDemo
{
    using System;
    using System.Diagnostics.CodeAnalysis;
    using System.Timers;
 
    [SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1600:ElementsMustBeDocumented", Justification = "Demo code")]
    public class PointTicker    {
        private const int TimerInterval = 5000;
 
        private static object lockField = new object();
 
        private static PointTicker defaultInstanceField;
 
        private PointTicker()
        {
        }
 
        public event EventHandler<PointTickerEventArgs> Update;
 
        public static PointTicker DefaultInstance
        {
            get            {
                lock (PointTicker.lockField)
                {
                    if (PointTicker.defaultInstanceField == null)
                    {
                        PointTicker.defaultInstanceField = new PointTicker();
                        PointTicker.defaultInstanceField.Initialize();
                    }
                }
 
                return PointTicker.defaultInstanceField;
            }
        }
 
        private static Timer Timer { get; set; }
 
        public void Start()
        {
            lock (PointTicker.lockField)
            {
                if (!PointTicker.Timer.Enabled)
                {
                    PointTicker.Timer.Start();
                }
            }
        }
 
        public void Stop()
        {
            lock (PointTicker.lockField)
            {
                if (PointTicker.Timer.Enabled)
                {
                    PointTicker.Timer.Stop();
                }
            }
        }
 
        protected virtual void OnUpdate(string feature)
        {
            if (this.Update != null)
            {
                this.Update(
                    this,
                    new PointTickerEventArgs()
                    {
                        Feature = feature
                    });
            }
        }
 
        private void Initialize()
        {
            PointTicker.Timer = new Timer(PointTicker.TimerInterval);
            PointTicker.Timer.Elapsed += this.Timer_Elapsed;
        }
 
        private void Timer_Elapsed(object sender, ElapsedEventArgs e)
        {
            Random random = new Random();
            string feature = string.Format("{{\"geometry\" : {{\"x\" : {0}, \"y\" : {1} }}, \"attributes\" : {{\"ObjectId\" : {2}, \"RouteID\" : 1, \"DateTimeStamp\" : {3} }}}}", random.NextDouble(8.40, 8.95).ToString(new System.Globalization.CultureInfo("en-US")), random.NextDouble(45.23, 45.85).ToString(new System.Globalization.CultureInfo("en-US")), random.Next(1, Int32.MaxValue), DateTime.Now.UnixTicks().ToString(new System.Globalization.CultureInfo("en-US")));
 
            this.OnUpdate(feature);
        }
    }
}

Now let's register the handler in web.config to tell IIS to use this handler when it is called.

 

 

<?xml version="1.0"?><!--  For more information on how to configure your ASP.NET application, please visit  http://go.microsoft.com/fwlink/?LinkId=169433  --><configuration>  <system.web>    <compilation debug="true" targetFramework="4.5" />    <httpRuntime targetFramework="4.5" />  </system.web>  <system.webServer>    <handlers>      <add name="StreamLayerWebSocketAsyncHandler" verb="*" path="wsStreamLayer" type="StreamLayerDemo.StreamLayerWebSocketAsyncHandler, StreamLayerDemo" resourceType="Unspecified" />    </handlers>  </system.webServer></configuration>

 

In Type point out the class including its namespace so that IIS can find it and in the path we will tell the WebSocket's name which we will use to call it (in this particular case I call it 'wsStreamLayer'):

ws://<yourdomain>/<site:port>/wsStreamLayer

 

In order to create a connection, WebSocket uses the command Http 'Upgrade' which suggests the server we are trying to pass to a WebSocket connection.

 

 

 

 

 

 

 

 

 

We can notice that as fiddler we also have Origin: the server views this origin to understand where messages are coming from.

Instead Sec-WebSocket-Key forms the first part key of handshake. It is a random value that has been base64 encoded.

Sec-WebSocket-Version allows the server to respond with a version of the protocol which is more suitable to the one supported by the client.

In response from server Sec-WebSocket-Accept has the key 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 append to decoded key from client and the result string is hashed in SHA-1 and then base64 encoded.
Moreover, as for l'http, it is possible to have the WebSocket communication on  ssl/tls using prefix wss:

 

wss://<yourdomain>/<site:port>/wsStreamLayer

 

Now let's have a test with the StreamLayer API class js Esri

 

<!doctype html><html><head>    <meta charset="utf-8">    <meta name="viewport" content="initial-scale=1, maximum-scale=1,user-scalable=no">    <title>StreamLayer using ArcGIS API for JavaScript</title>    <link rel="stylesheet" href="http://js.arcgis.com/3.10/js/dojo/dijit/themes/tundra/tundra.css">    <link rel="stylesheet" href="http://js.arcgis.com/3.10/js/esri/css/esri.css">    <style type="text/css">        html, body {
            height: 100%;
            width: 100%;
            margin: 0;
            padding: 0;
        }
 
        body {
            background-color: #fff;
            overflow: hidden;
            font-family: sans-serif;
        }
 
        #map {
            width: 100%;
            height: 80%;
        }
    </style>    <script src="http://js.arcgis.com/3.10/"></script></head><body class="tundra">    <div id="map"></div>    <div>        <span>Enter websocket connection: </span><input type="text" id="txtWsUrl" value="ws://localhost:55555/wsStreamLayer" style="width: 400px" /><br />        <input type="button" id="cmdNewStream" value="Make Stream Layer" />        <input type="button" id="cmdDisconnect" value="Disconnect Stream Layer" />    </div> 
 
    <script>        var curTime = new Date();
        var curTimeStamp = Date.parse(curTime.toUTCString());
        var layerDefinition = {
            "geometryType": "esriGeometryPoint",
            "timeInfo": {
                "startTimeField": "DateTimeStamp",
                "endTimeField": null,
                "trackIdField": "RouteID",
                "timeReference": null,
                "timeInterval": 1,
                "timeIntervalUnits": "esriTimeUnitsMinutes",
                "exportOptions": {
                    "useTime": true,
                    "timeDataCumulative": false,
                    "timeOffset": null,
                    "timeOffsetUnits": null                },
                "hasLiveData": true            },
            "fields": [
              {
                  name: "ObjectId",
                  type: "esriFieldTypeOID",
                  alias: "ObjectId"              },
              {
                  name: "DateTimeStamp",
                  type: "esriFieldTypeDate",
                  alias: "DateTimeStamp"              },
              {
                  name: "RouteID",
                  type: "esriFieldTypeInteger",
                  alias: "RouteID"              }
            ]
        };
 
        var map, featureCollection, streamLayer;
 
        require(["esri/map",
          "esri/TimeExtent",
          "esri/layers/StreamLayer",
          "esri/InfoTemplate",
          "esri/symbols/SimpleMarkerSymbol",
          "esri/symbols/SimpleLineSymbol",
          "esri/renderers/SimpleRenderer",
          "esri/renderers/TimeClassBreaksAger",
          "esri/renderers/TemporalRenderer",
          "esri/Color",
          "dojo/dom",
          "dojo/on",
          "dojo/domReady!"        ], function (Map, TimeExtent, StreamLayer, InfoTemplate, SimpleMarkerSymbol, SimpleLineSymbol, SimpleRenderer, TimeClassBreaksAger, TemporalRenderer, Color, dom, on) {
            var trackedBusses = {}, cnt = 0;
 
            map = new Map("map", {
                basemap: "gray",
                center: [8.675, 45.54],
                zoom: 10
            });
 
            // event listeners for button clicks            on(dom.byId("cmdNewStream"), "click", makeNewStreamLayer);
            on(dom.byId("cmdDisconnect"), "click", disconnectStreamLayer);
 
            function makeStreamLayer() {
                //Make FeatureCollection to define layer without using url                featureCollection = {
                    "layerDefinition": null,
                    "featureSet": {
                        "features": [],
                        "geometryType": "esriGeometryPoint"                    }
                };
                featureCollection.layerDefinition = layerDefinition;
 
                // Instantiate StreamLayer                // 1. socketUrl is the url to the GeoEvent Processor web socket.                // 2. purgeOptions.displayCount is the maximum number of features the                //    layer will display at one time                // 3. trackIdField is the name of the field that groups features                var layer = new StreamLayer(featureCollection, {
                    socketUrl: txtWsUrl.value,
                    purgeOptions: { displayCount: 500 },
                    trackIdField: featureCollection.layerDefinition.timeInfo.trackIdField,
                    infoTemplate: new InfoTemplate("Route Id: ${RouteID}", "Timestamp: ${DateTimeStamp}")
                });
                console.log("TrackID: ", featureCollection.layerDefinition.timeInfo.trackIdField);
                console.log("TrackID: ", layer.timeInfo.trackIdField);
 
                //Make renderer and apply it to StreamLayer                var renderer = makeRenderer();
                layer.setRenderer(renderer);
 
                //Subscribe to onMessage event of StreamLayer so can adjust map time                layer.on("message", processMessage);
                layer.on("connect", connectevt);
                layer.on("error", errorevt);
                return layer;
            }
 
            function connectevt() {
                console.log("Connesso");
            }
 
            function errorevt() {
                console.log("error");
            }
 
            // Process message that StreamLayer received.            function processMessage(message) {
                if (featureCollection.layerDefinition.timeInfo &&
                    featureCollection.layerDefinition.timeInfo.startTimeField) {
                    var timestamp = message.attributes[featureCollection.layerDefinition.timeInfo.startTimeField];
                    if (!map.timeExtent) {
                        map.setTimeExtent(new esri.TimeExtent(new Date(timestamp), new Date(timestamp)));
                        console.log("TIME EXTENT: ", map.timeExtent);
                    } else {
                        var tsEnd = Date.parse(map.timeExtent.endTime.toString());
                        if (timestamp > tsEnd) {
                            map.setTimeExtent(new esri.TimeExtent(map.timeExtent.startTime, new Date(timestamp)));
                            console.log("TIME EXTENT: ", map.timeExtent);
                        }
                    }
                }
            }
 
            // Make new StreamLayer and add it to map.            function makeNewStreamLayer() {
                disconnectStreamLayer();
                streamLayer = makeStreamLayer();
                map.addLayer(streamLayer);
            }
 
            // Disconnect StreamLayer from websocket and remove it from the map            function disconnectStreamLayer() {
                if (streamLayer) {
                    streamLayer.suspend();
                    streamLayer.disconnect();
                    streamLayer.clear();
                    map.removeLayer(streamLayer);
                    streamLayer = null;
                    //map.timeExtent = null;                }
            }
 
            // Make temporal renderer with latest observation renderer            function makeRenderer() {
                var obsRenderer = new SimpleRenderer(
                  new SimpleMarkerSymbol("circle", 8,
                  new SimpleLineSymbol("solid",
                  new Color([5, 112, 176, 0]), 1),
                  new Color([5, 112, 176, 0.4])
                ));
 
                var latestObsRenderer = new SimpleRenderer(
                  new SimpleMarkerSymbol("circle", 12,
                  new SimpleLineSymbol("solid",
                  new Color([5, 112, 176, 0]), 1),
                  new Color([5, 112, 176])
                ));
 
                var temporalRenderer = new TemporalRenderer(obsRenderer, latestObsRenderer, null, null);
                return temporalRenderer;
            }
        });
    </script></body></html> 
 
 
 
 

Download solution here.

Outcomes