bensaunders

Orphan Tracks in GeoEvent Processor

Blog Post created by bensaunders on Jul 17, 2015

Problem

We have been experimenting with the GeoEvent Processor and a particular feed of data. One thing we soon realized with the feed was an event could expire simply by disappearing from the feed. In other words, there was no event with an identifier explicitly saying "I'm expired." It's mere absence was the indicator.

 

So how do we delete that event from our geodatabase?

 

Thought 1

We wanted the event to disappear from the geodatabase as soon as we learned about it through the GeoEvent Processor. So what looked promising was the out-of-the-box Track Gap Detector processor. It would generate a new flagging event as soon as a particular event was missing (a gap in its track). The only problem is, our events never return. As a consequence, they would keep piling up in the processor as most excellently summarized by RJ Sunderman in his message.

 

Thought 2

We needed something like the Track Gap Detector, just without it caching all the events until, well, the end of time or our server's tolerance for such things. I tried creating a custom processor similar to the Track Gap Detector, or so I thought. The problem I ran into was with threading - the processor needed to every so often run through its list of expected events and compare them to what has flowed through. It could then determine which events have gone missing and generate new flagged events for them, and then remove them from its own cache. My problem was with a processor, I couldn't figure out how to best trigger that determination process. I tried a timer thread, but I couldn't get it's lifecycle to jive with the processor's state. If my custom processor got restarted, I would get another new timer thread as well as the older one running. Yeah, that's no good. And because I couldn't see any other method on the processor that I could override that could deal with all the events at once (as opposed to the process method which takes one GeoEvent at a time), I came up with the solution below. So I could very well have been ignorant of something or somethings with the processor, but I was following the path of least resistence (if anyone can provide some insight on what could be used off of the GeoEventProcessorBase class, please don't be shy and comment!).

 

Solution

We were already using a custom adapter to create our events (GeoEvents) anyway, so a logical place to track orphan events seemed like there. Our next concept to wrap our brains around was now that we can generate GeoEvents that represent orphans, how do they ultimately get deleted from the geodatabase? The Update a Feature output connector would only insert and update the feature class in the geodatabase (thru ArcGIS Server). It would also delete features based on a timestamp in the past. So one thought was to change the expiration date (because we still strangely had those with our events, just that the events would get cancelled and disappear before their stated expiration) and it would eventually get deleted. But it was contingent on that deletion cycle. What would be truly awesome was for the event to get deleted when the inserts and updates take place, not on some independent process (one that could cycle rather frequently, though).

 

So instead we set up a new column in our data called, wait for it... "TO_DELETE". So instead of messing with the expiration date, we set TO_DELETE to 1 for any record that is now an orphan. Anyone consuming the data would just need a definition query of TO_DELETE = 0 to get all the current records.

 

OK, so now we have records flagged in the geodatabase. How do they eventually get deleted out? In the GeoEvent Processor, we set up a Poll an ArcGIS Server For Features input connector pointed at our layer with a definition query of TO_DELETE = 1. We also set up the connector to delete the features from the service after they are retrieved. And so there's our process to clean the feature class every so often. We don't do anything else with those GeoEvents from the input connector.

 

Implementation

Here's the juicy code bits... On how to write custom adapters, see ESRI's Extending using GeoEvent Extension Software Development Kit as well as the many samples found in the ArcGIS GeoEvent Gallery.

 

First, what we used in out custom input adapter:

 

public class OurInboundAdapter extends InboundAdapterBase {

    private final Map<String, TrackedGeoEvent> trackedGeoEvents = new ConcurrentHashMap<String, TrackedGeoEvent>();

    public OurboundAdapter(AdapterDefinition definition) throws ComponentException {
        super(definition);
    }

    @Override
    public void receive(ByteBuffer aByteBuffer, String aChannelId) {
        super.receive(aByteBuffer, aChannelId);
        
        // Get current time.
        long pCurrentTime = System.currentTimeMillis();

        // Create ID list.
        ArrayList<String> pIDList = new ArrayList<String>();

        String pData;
        while (aByteBuffer.hasRemaining()) {
            aByteBuffer.mark();
            String pID = null;
            try {
                // Get data.
                byte[] pByteArray = new byte[aByteBuffer.remaining()];
                aByteBuffer.get(pByteArray);
                pData = new String(pByteArray);
                
                // Process data to create new GeoEvent...
                
                // Emit GeoEvent.
                geoEventListener.receive(pGeoEvent);
                
                // Add to tracking list.
                this.checkInGeoEvent(pGeoEvent, pCurrentTime);
            } catch (Exception aException) {
                // Perform error handling...
            }
            
            // Track orphans.
            try {
                this.discoverOrphans(pCurrentTime);
            } catch (Exception aException) {
                // Perform error handling...
            }
        }
    }

    private void checkInGeoEvent(GeoEvent aGeoEvent, long aTimestamp) {
        String pCacheKey = this.buildCacheKey(aGeoEvent);
        
        // Create new tracked GeoEvent.
        TrackedGeoEvent pTrackedGeoEvent = new TrackedGeoEvent(aGeoEvent.getTrackId(), aGeoEvent.getGeometry(), aTimestamp);
        
        // Add/replace in list.
        this.trackedGeoEvent.put(pCacheKey, pTrackedGeoEvent);
    }
    
    private String buildCacheKey(GeoEvent aGeoEvent) {
        if (aGeoEvent != null && aGeoEvent.getTrackId() != null) {
            GeoEventDefinition aGeoEventDefinition = aGeoEvent.getGeoEventDefinition();
            return aGeoEventDefinition.getOwner() + "/" + aGeoEventDefinition.getName() + "/" + aGeoEvent.getTrackId();
        }
        return null;
    }
    
    private void discoverOrphans(Long aTimestamp) throws MessagingException {
        // Create iterator.
        Iterator<Map.Entry<String, TrackedGeoEvent>> pIterator = this.trackedGeoEvents.entrySet().iterator();
        
        while (pIterator.hasNext()) {
            // Get tracked GeoEvent.
            Map.Entry<String, TrackedGeoEvent> pEntry = pIterator.next();
            TrackedGeoEvent pTrackedGeoEvent = pEntry.getValue();
            
            if (pTrackedGeoEvent.getTimeStamp() != aTimestamp) {
                // Create stripped down GeoEvent.
                GeoEvent pGeoEvent = geoEventCreator.create(((AdapterDefinition) definition).getGeoEventDefinition("OurGeoEvent").getGuid());
                
                // Set geometry.
                try {
                    // Set track id.
                    pGeoEvent.setField("ourtrackid", pTrackedGeoEvent.getTrackId());
                
                    // Set geometry.
                    pGeoEvent.setGeometry(pTrackedGeoEvent.getMapGeometry());
                    
                    // Mark for deletion.
                    pGeoEvent.setField("to_delete", 1);
                } catch (FieldException aFieldException) {
                    // Perform error handling...
                }
                
                // Emit GeoEvent.
                geoEventListener.receive(pGeoEvent);

                // Remove from list.
                pIterator.remove();
            }
        }
    }
}

 

As you can probably tell (and from some potential errors), this is adapted from our real class. But the key things are:

  • Line 35 where we start tracking every event.
  • Line 42 where we determine the orphan events every run (not every GeoEvent creation).

 

And here is the TrackedGeoEvent class:

 

import com.esri.core.geometry.MapGeometry;

public class TrackedGeoEvent {
    
    private String trackID;
    private MapGeometry mapGeometry;
    private long timeStamp;
    
    public TrackedNWSAlert(String aTrackID, MapGeometry aMapGeometry,
            long aTimeStamp) {
        this.trackID = aTrackID;
        this.mapGeometry = aMapGeometry;
        this.timeStamp = aTimeStamp;
    }

    public String getAlertID() {
        return trackID;
    }

    public void setAlertID(String aTrackID) {
        this.trackID = aTrackID;
    }

    public MapGeometry getMapGeometry() {
        return mapGeometry;
    }

    public void setMapGeometry(MapGeometry aMapGeometry) {
        this.mapGeometry = aMapGeometry;
    }

    public long getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(long aTimeStamp) {
        this.timeStamp = aTimeStamp;
    }
}

 

It's just a POJO. One could probably rework things with the cache (the ConCurrentHashMap in OurInboundAdpater) and avoid using a separate class to store things, but this works...

 

Epilogue

In revisiting this, there would be two things I would look at doing:

 

TO_DELETE vs. Expiration Date Field

I would probably switch things in this situation to just using the expiration date field as our flag for deleting. In the input adapter, rather than setting a new field in our schema (TO_DELETE), we could just simply set the GeoEvent's expiration date to something in the past and let the Update a Feature output connector handle the deleting from there on out (as long as the new date meets the threshold for the connector to delete it). That way we are:

      1. not having to introduce a new field into our feature class.
      2. not having to create and run a separate Poll an ArcGIS Server For Features input connector.

Of course this only works with an expiration date field being part of the GeoEvent. And farther down the line we would have to make sure any viewers of the data are able to create a definition expression that filters out anything with an expiration date in the past. Not everyone will be able to handle that as dates are tricky in expressions, never mind the challenge of determining the current date and time in whatever system. But that's what views and/or query layers are for!

 

Make More Generic

Probably what I'll do in the future if we use this methodology for a lot of different feeds is make it more generic. In other words, see if I can extend the various input base classes just enough to incorporate the orphan determination, but not be custom enough for individual data feeds. That way they can be imported into the GeoEvent Processor and used like the rest of the everyday out-of-the-box input connectors. Only time and effort will tell...!

 

Of course, like everything with ESRI solutions and technology in general, there's more than one way to skin a cat (sorry, grim metaphor...). So if you've got a better way of doing this or alternative ideas, please don't be shy and make some comments! This is working for us, but that doesn't necessarily mean it's the easiest and/or best approach.

Outcomes