Select to view content in your preferred language

How to check if every segments of a road have the same directions?

4004
17
Jump to solution
11-02-2017 03:35 AM
ZenekS_
Emerging Contributor

Hi,

I’m attempting to make a linear referencing. It’s a polyline M layer (with lengths along every line)  of forest roads . Every road (unique road number) consists of 1, 2 or more segments. In the attribute table I have a field with segment_id (unique in the layer) and a field with the road_number (many segments can have the same road_number). The layer is sorted ascending by segment_id and road_number.

The problem is – how to check if every segments of one road_number have the same direction. Maybe the solution is to calculate distance for end point and start point of subsequent segments in every road_number (but not for first and last segments – only the middle segments). When the distance if more than 0 – it will be an error. But how to calculate it?

Thanks for any suggestions how to solve this.

0 Kudos
17 Replies
JoshuaBixby
MVP Esteemed Contributor

I am a bit confused by Road C.  If all of the road segments point the direction of C, 7; wouldn't that make C, 10 the first segment or is Road C being traversed in reverse? 

0 Kudos
ZenekS_
Emerging Contributor

In my data the right direction for all segments of a given road points the segment with the lowest number in field [Segment_ID]. So for road C the direction for all segments indicates segment 7 (value in field [Segment_ID]). That isn't the same value like in field [OBJECTID].

A small digression:

Another thing is how to change/flip the measures values of segments identified as a "wrong direction" to be increasing with the right/changed direction. That is because, when I change the direction the M values are automatically decreasing with direction, and they should be increasing.

I know how to make it manually for each segment (RouteEditing Toolbar), but maybe can it be doing automatically.

But now the question is how to identify the segments of a road running in the wrong direction.

0 Kudos
XanderBakker
Esri Esteemed Contributor

Find below some code changed based on your comment and data provided.

  • each route starts and feature with lowest segment ID
  • will generate list of features that need to be flipped and a list of the order of the segments in the route

def main():
    import arcpy

    # input fc and name of field containing route name
    fc = r'C:\GeoNet\RoadDirection2\test.gdb\roads_m'
    fld_name = 'RouteName'
    fld_id = 'Segment_ID'
    tolerance = 10

    # sr
    sr = arcpy.Describe(fc).spatialReference

    # create dictionary of data
    dct_routes = {}
    dct_seg_id_oid = {}
    dct_oid_seg_id = {}
    flds = ('OID@', 'SHAPE@', fld_name, fld_id)
    with arcpy.da.SearchCursor(fc, flds) as curs:
        for row in curs:
            dct_data = {}
            oid = row[0]
            polyline = row[1]
            route_name = row[2]
            seg_id = row[3]
            dct_seg_id_oid[seg_id] = oid
            dct_oid_seg_id[oid] = seg_id
            if route_name in dct_routes:
                dct_data = dct_routes[route_name]
                dct_data[oid] = [polyline, seg_id]
            else:
                dct_data[oid] = [polyline, seg_id]
            dct_routes[route_name] = dct_data

    # determnine first segment of route based on segment id
    dct_start_seg_id = {}
    for route_name, dct_data in sorted(dct_routes.items()):
        seg_ids = [s[1] for p,s in dct_data.items()]
        dct_start_seg_id[route_name] = min(seg_ids)

    # trace route from staring seg_id to end
    lst_flip_oid = []
    lst_candidates_todo = []
    lst_multi_candidates = []
    dct_route_order = {}
    for route_name, seg_id in sorted(dct_start_seg_id.items()):
        route_order = [seg_id]
        print "trace:", route_name, " starting @", seg_id
        dct_data = dct_routes[route_name]
        dct_start_pnts = CreateDictionaryStartOrEndPoints(dct_data, "start")
        dct_end_pnts = CreateDictionaryStartOrEndPoints(dct_data, "end")
        # print dct_data
        # print dct_start_pnts
        # print dct_end_pnts
        oid = dct_seg_id_oid[seg_id]
        start_pnt = dct_start_pnts[oid]
        end_pnt = dct_end_pnts[oid]
        lst_processed = [oid]
        lst_todo = [o for o, lst in dct_data.items() if o != oid]
        # print lst_processed
        # print lst_todo
        while len(lst_todo) > 0:
            print "lst_processed:", lst_processed
            # get candidates for end point of line
            oid_cand_end, start_end_end, dist_from_end = GetCandidate(end_pnt, dct_start_pnts, dct_end_pnts, lst_todo, tolerance)
            print " - GetCandidate 1:", oid_cand_end, start_end_end, dist_from_end
            # get candidates for start point of line
            oid_cand_start, start_end_start, dist_from_start = GetCandidate(start_pnt, dct_start_pnts, dct_end_pnts, lst_todo, tolerance)
            print " - GetCandidate 2:", oid_cand_start, start_end_start, dist_from_start

            # check is there are candidates
            if dist_from_start == 9999 and dist_from_end == 9999:
                # no candidates
                print" - no candidates!"
                lst_candidates_todo.extend(lst_todo)
                lst_todo = []
                oid_found = None

            elif dist_from_start == dist_from_end:
                # two candidates with same score, error!
                print " - two candidates with same score, error!"
                lst_multi_candidates.append([oid_cand_end, oid_cand_start])
                lst_candidates_todo.extend(lst_todo)
                lst_todo = []
                oid_found = None

            elif dist_from_start < dist_from_end:
                # candidate is at start of line, requires flip of current oid
                print " - candidate is at start of line, requires flip of current oid"
                if start_end_start == "end":
                    # also candidate need to be flipped
                    print " - current start connects to candidate end, also candidate need to be flipped"
                    print " - adding candidate oid ", oid_cand_start, " to flip list"
                    lst_flip_oid.append(oid_cand_start)

                if not oid in lst_flip_oid:
                    lst_flip_oid.append(oid)
                    print " - adding current oid ", oid, " to flip list"
                oid_found = oid_cand_start

            else:
                # candidate is at end of line, OK
                if start_end_end== "end":
                    # current end connects to candidate end, candidate need to be flipped
                    print " - current end connects to candidate end, candidate need to be flipped"
                    lst_flip_oid.append(oid_cand_end)
                    print "   - adding candidate oid ", oid_cand_end, " to flip list"
                oid_found = oid_cand_end

            if oid_found != None:
                start_pnt = dct_start_pnts[oid_found]
                end_pnt = dct_end_pnts[oid_found]
                lst_processed.append(oid_found)
                route_order.append(dct_oid_seg_id[oid_found])
                lst_todo.remove(oid_found)
                oid = oid_found


            print "lst_todo", lst_todo
            print "lst_flip_oid", lst_flip_oid
        dct_route_order[route_name] = route_order
        print


    print "lst_flip_oid        :", sorted(lst_flip_oid)
    print "lst_candidates_todo :", sorted(lst_candidates_todo)
    print "lst_multi_candidates:", lst_multi_candidates

    for route_name, route_order in sorted(dct_route_order.items()):
        print " - Route:", route_name, "  seg id order:", route_order

def GetCandidate(end_pnt, dct_start_pnts, dct_end_pnts, lst_todo, tolerance):
    # return oid_cand_start, start_end, dist_from_start
    dist_min = 9999
    oid_min = None
    type_min = None
    for oid, pnt in dct_start_pnts.items():
        if oid in lst_todo:
            dist = GetDistance(end_pnt.X, end_pnt.Y, pnt.X, pnt.Y)
            if dist < tolerance:
                if dist < dist_min:
                    dist_min = dist
                    oid_min = oid
                    type_min = "start"

    for oid, pnt in dct_end_pnts.items():
        if oid in lst_todo:
            dist = GetDistance(end_pnt.X, end_pnt.Y, pnt.X, pnt.Y)
            if dist < tolerance:
                if dist < dist_min:
                    dist_min = dist
                    oid_min = oid
                    type_min = "end"

    return oid_min, type_min, dist_min

def GetDistance(x1, y1, x2, y2):
    import math
    return math.hypot(x2-x1, y2-y1)

def CreateDictionaryStartOrEndPoints(dct_data, start_or_end):
    dct_pnts = {}
    for oid, lst in dct_data.items():
        polyline = lst[0]
        if start_or_end == "start":
            dct_pnts[oid] = polyline.firstPoint
        else:
            dct_pnts[oid] = polyline.lastPoint
    return dct_pnts

if __name__ == '__main__':
    main()

Will yield:

trace: A  starting @ 1
lst_processed: [1]
 - GetCandidate 1: 2 end 0.0
 - GetCandidate 2: None None 9999
 - current end connects to candidate end, candidate need to be flipped
   - adding candidate oid  2  to flip list
lst_todo [3]
lst_flip_oid [2]
lst_processed: [1, 2]
 - GetCandidate 1: None None 9999
 - GetCandidate 2: 3 start 0.0
 - candidate is at start of line, requires flip of current oid
lst_todo []
lst_flip_oid [2]

trace: B  starting @ 4
lst_processed: [4]
 - GetCandidate 1: 5 start 0.0
 - GetCandidate 2: None None 9999
lst_todo [6]
lst_flip_oid [2]
lst_processed: [4, 5]
 - GetCandidate 1: 6 end 0.0
 - GetCandidate 2: None None 9999
 - current end connects to candidate end, candidate need to be flipped
   - adding candidate oid  6  to flip list
lst_todo []
lst_flip_oid [2, 6]

trace: C  starting @ 7
lst_processed: [10]
 - GetCandidate 1: None None 9999
 - GetCandidate 2: 8 start 0.0
 - candidate is at start of line, requires flip of current oid
 - adding current oid  10  to flip list
lst_todo [9, 7]
lst_flip_oid [2, 6, 10]
lst_processed: [10, 8]
 - GetCandidate 1: 9 start 0.0
 - GetCandidate 2: None None 9999
lst_todo [7]
lst_flip_oid [2, 6, 10]
lst_processed: [10, 8, 9]
 - GetCandidate 1: 7 start 0.0
 - GetCandidate 2: None None 9999
lst_todo []
lst_flip_oid [2, 6, 10]

lst_flip_oid        : [2, 6, 10]
lst_candidates_todo : []
lst_multi_candidates: []
 - Route: A   seg id order: [1, 2, 3]
 - Route: B   seg id order: [4, 5, 6]
 - Route: C   seg id order: [7, 8, 9, 10]

At the end you will notice that oid's 2, 6 and 10 needs to be flipped (lst_flip_oid). Also the segment ID order for each route is provided.

ZenekS_
Emerging Contributor

Xander, thank you very much. I have tested your last scipt on my big feature dataset with forest routes. Your script makes exactly what I meant - identifying errors on segments running in wrong direction. 

For me the most important thing is the last line of script's output starting with "1st_flip_oid: [......]". It contains all segments their direction have to be changed.  

Another request concerns if that line can be saved as table with for example 3 fields: [OBJECTID], [RouteName], [Segment_ID] ? If yes, it would be helpful for me to make a join to the original feature dateset using field [OBJECTID]. And then I could use from ArcGIS Toolbox the tool "Flip Line" and automatically flip all the segments, that need it.

The code works, but could you explain me the general concepion, you have made to identifying segments with wrong direction?

And again, thank you very much.

0 Kudos
XanderBakker
Esri Esteemed Contributor

You could try this. It will take the input fields to the output featureclass and add a field "FlipCase" that will hold "Flipped" when the polyline was flipped. However, the M values are not reversed for each point. Are the M values length values?

def main():
    import arcpy
    import os

    # input fc and name of field containing route name
    fc = r'C:\GeoNet\RoadDirection2\test.gdb\roads_m'
    fc_out = r'C:\GeoNet\RoadDirection2\test.gdb\roads_m_corr7' # output fc
    fld_name = 'RouteName'
    fld_id = 'Segment_ID'
    fld_case = 'FlipCase'  # stores info on correction feature
    tolerance = 10 # tolerance for searching connecting segments for same route

    # sr
    sr = arcpy.Describe(fc).spatialReference
    arcpy.env.OverwriteOutput = True

    # create dictionary of data
    dct_routes = {}
    dct_seg_id_oid = {}
    dct_oid_seg_id = {}
    flds = ('OID@', 'SHAPE@', fld_name, fld_id)
    with arcpy.da.SearchCursor(fc, flds) as curs:
        for row in curs:
            dct_data = {}
            oid = row[0]
            polyline = row[1]
            route_name = row[2]
            seg_id = row[3]
            dct_seg_id_oid[seg_id] = oid
            dct_oid_seg_id[oid] = seg_id
            if route_name in dct_routes:
                dct_data = dct_routes[route_name]
                dct_data[oid] = [polyline, seg_id]
            else:
                dct_data[oid] = [polyline, seg_id]
            dct_routes[route_name] = dct_data

    # supporting dct with oid and geometry
    dct_oid_geom = {r[0]: r[1] for r in arcpy.da.SearchCursor(fc, ('OID@', 'SHAPE@'))}

    # determnine first segment of route based on segment id
    dct_start_seg_id = {}
    for route_name, dct_data in sorted(dct_routes.items()):
        seg_ids = [s[1] for p,s in dct_data.items()]
        dct_start_seg_id[route_name] = min(seg_ids)

    # trace route from staring seg_id to end
    lst_flip_oid = []
    lst_candidates_todo = []
    lst_multi_candidates = []
    dct_route_order = {}
    for route_name, seg_id in sorted(dct_start_seg_id.items()):
        route_order = [seg_id]
        print "trace:", route_name, " starting @", seg_id
        dct_data = dct_routes[route_name]
        dct_start_pnts = CreateDictionaryStartOrEndPoints(dct_data, "start")
        dct_end_pnts = CreateDictionaryStartOrEndPoints(dct_data, "end")
        oid = dct_seg_id_oid[seg_id]
        start_pnt = dct_start_pnts[oid]
        end_pnt = dct_end_pnts[oid]
        lst_processed = [oid]
        lst_todo = [o for o, lst in dct_data.items() if o != oid]
        while len(lst_todo) > 0:
            print "lst_processed:", lst_processed
            # get candidates for end point of line
            oid_cand_end, start_end_end, dist_from_end = GetCandidate(end_pnt, dct_start_pnts, dct_end_pnts, lst_todo, tolerance)
            print " - GetCandidate 1:", oid_cand_end, start_end_end, dist_from_end
            # get candidates for start point of line
            oid_cand_start, start_end_start, dist_from_start = GetCandidate(start_pnt, dct_start_pnts, dct_end_pnts, lst_todo, tolerance)
            print " - GetCandidate 2:", oid_cand_start, start_end_start, dist_from_start

            # check is there are candidates
            if dist_from_start == 9999 and dist_from_end == 9999:
                # no candidates
                print" - no candidates!"
                lst_candidates_todo.extend(lst_todo)
                lst_todo = []
                oid_found = None

            elif dist_from_start == dist_from_end:
                # two candidates with same score, error!
                print " - two candidates with same score, error!"
                lst_multi_candidates.append([oid_cand_end, oid_cand_start])
                lst_candidates_todo.extend(lst_todo)
                lst_todo = []
                oid_found = None

            elif dist_from_start < dist_from_end:
                # candidate is at start of line, requires flip of current oid
                print " - candidate is at start of line, requires flip of current oid"
                if start_end_start == "end":
                    # also candidate need to be flipped
                    print " - current start connects to candidate end, also candidate need to be flipped"
                    print " - adding candidate oid ", oid_cand_start, " to flip list"
                    lst_flip_oid.append(oid_cand_start)

                if not oid in lst_flip_oid:
                    lst_flip_oid.append(oid)
                    print " - adding current oid ", oid, " to flip list"
                oid_found = oid_cand_start

            else:
                # candidate is at end of line, OK
                if start_end_end== "end":
                    # current end connects to candidate end, candidate need to be flipped
                    print " - current end connects to candidate end, candidate need to be flipped"
                    lst_flip_oid.append(oid_cand_end)
                    print "   - adding candidate oid ", oid_cand_end, " to flip list"
                oid_found = oid_cand_end

            if oid_found != None:
                start_pnt = dct_start_pnts[oid_found]
                end_pnt = dct_end_pnts[oid_found]
                lst_processed.append(oid_found)
                route_order.append(dct_oid_seg_id[oid_found])
                lst_todo.remove(oid_found)
                oid = oid_found


            print "lst_todo", lst_todo
            print "lst_flip_oid", lst_flip_oid
        dct_route_order[route_name] = route_order
        print


    print "lst_flip_oid        :", sorted(lst_flip_oid)
    print "lst_candidates_todo :", sorted(lst_candidates_todo)
    print "lst_multi_candidates:", lst_multi_candidates

    for route_name, route_order in sorted(dct_route_order.items()):
        print " - Route:", route_name, "  seg id order:", route_order


    # create output featureclass
    ws, fc_name = os.path.split(fc_out)
    arcpy.CreateFeatureclass_management(ws, fc_name, "POLYLINE", fc, "ENABLED", "DISABLED", sr)
    arcpy.AddField_management(fc_out, fld_case, "TEXT", None, None, 50)

    flds_in = [fld.name for fld in arcpy.ListFields(fc)]
    flds_out = [fld.name for fld in arcpy.ListFields(fc_out)]
    flds_in.remove('Shape_Length')
    flds_out.remove('Shape_Length')
    fld_geom = arcpy.Describe(fc).ShapeFieldName
    fld_oid = arcpy.Describe(fc).OIDFieldName

    flds_in[flds_in.index(fld_geom)] = 'SHAPE@'
    i_oid = flds_in.index(fld_oid)
    flds_in[flds_in.index(fld_oid)] = 'OID@'
    flds_out[flds_out.index(fld_geom)] = 'SHAPE@'
    flds_out[flds_out.index(fld_oid)] = 'OID@'

    fld_geom = 'SHAPE@'
    fld_oid = 'OID@'

    cnt = 0
    with arcpy.da.InsertCursor(fc_out, flds_out) as curs_out:
        with arcpy.da.SearchCursor(fc, flds_in) as curs:
            for row in curs:
                cnt += 1
                if cnt % 100 == 0:
                    print "Processing feature:", cnt
                oid = row[i_oid]
                polyline = dct_oid_geom[oid] # row[i_shp]
                if oid in lst_flip_oid:
                    case = "Flipped"
                    polyline_out = FlipPolyline(polyline)
                else:
                    case = "OK"
                    polyline_out = polyline

                row_out = []
                for fld_name in flds_out[:-1]:
                    row_out.append(row[flds_in.index(fld_name)])
                row_out.append(case)
                row_out[flds_out.index(fld_geom)] = polyline_out
                print row_out
                tpl_row = tuple(row_out)
                curs_out.insertRow(tpl_row)



def FlipPolyline(polyline):
    print "before:", polyline.length
    vertices = []
    for part in polyline:
        for pnt in part:
            vertices.append(pnt)
    polyline_out = arcpy.Polyline(arcpy.Array(vertices[::-1]),
                polyline.spatialReference, False, True)
    print "after :", polyline_out.length
    return polyline_out

def GetCandidate(end_pnt, dct_start_pnts, dct_end_pnts, lst_todo, tolerance):
    # return oid_cand_start, start_end, dist_from_start
    dist_min = 9999
    oid_min = None
    type_min = None
    for oid, pnt in dct_start_pnts.items():
        if oid in lst_todo:
            dist = GetDistance(end_pnt.X, end_pnt.Y, pnt.X, pnt.Y)
            if dist < tolerance:
                if dist < dist_min:
                    dist_min = dist
                    oid_min = oid
                    type_min = "start"

    for oid, pnt in dct_end_pnts.items():
        if oid in lst_todo:
            dist = GetDistance(end_pnt.X, end_pnt.Y, pnt.X, pnt.Y)
            if dist < tolerance:
                if dist < dist_min:
                    dist_min = dist
                    oid_min = oid
                    type_min = "end"

    return oid_min, type_min, dist_min

def GetDistance(x1, y1, x2, y2):
    import math
    return math.hypot(x2-x1, y2-y1)

def CreateDictionaryStartOrEndPoints(dct_data, start_or_end):
    dct_pnts = {}
    for oid, lst in dct_data.items():
        polyline = lst[0]
        if start_or_end == "start":
            dct_pnts[oid] = polyline.firstPoint
        else:
            dct_pnts[oid] = polyline.lastPoint
    return dct_pnts

if __name__ == '__main__':
    main()
ZenekS_
Emerging Contributor

Xander, your programming skills are awesome :-). The code makes exactly what I want, and even more - it creates a new feature dataset with corrected directions. 

But to answer your question. Yes, in this case, we can assume that M values are also the roads lengths.

So, what I now have to do is to reverse the M values of segments identified as a "Flipped" to be increasing with the right/changed direction. Once I changed the direction the M values are automatically decreasing with direction, and they should be increasing.

As I said before, I know how to make it manually for every segment (RouteEditing Toolbar) - screenshot below:

But maybe can it be doing automatically ?

0 Kudos
XanderBakker
Esri Esteemed Contributor

I'm glad it was helpful. Always nice to have a challenge.

Just wondering, if you use the tool Create Routes—Help | ArcGIS Desktop it will dissolve the segments and it can generate M values based on the length.

The code can be adapted to create M values based on the distance along the line. I can toss some code your way to show you how that can be done, but it might slow the code down. Let me know if you want the snippet and I can post it.

0 Kudos
XanderBakker
Esri Esteemed Contributor

The "fast way" of inverting the correcting the M value is:

    polyline_out = FlipPolyline(polyline, True)

def FlipPolyline(polyline, correct_m):
    sr = polyline.spatialReference
    length = polyline.length
    vertices = []
    for part in polyline:
        for pnt in part:
            if correct_m:
                pnt.M = length - pnt.M
            vertices.append(pnt)
    polyline_out = arcpy.Polyline(arcpy.Array(vertices[::-1]),
                sr, False, True)
    return polyline_out

However, this may be prone to errors. To do it "correctly" one should generate the flipped line and determine the position of the vertice on the line and set that value as M.

Something like this (not tested):

    polyline_out = FlipPolyline(polyline, True)

def FlipPolyline2(polyline, correct_m):
    sr = polyline.spatialReference
    length = polyline.length
    vertices = []
    for part in polyline:
        for pnt in part:
            vertices.append(pnt)
    polyline_out = arcpy.Polyline(arcpy.Array(vertices[::-1]),
                sr, False, True)

    if correct_m:
        vertices = []
        for part in polyline_out:
            for pnt in part:
                pntg = arcpy.PointGeometry(pnt, sr, False, True)
                measure = polyline_out.measureOnLine(pntg, False)
                pnt.M = measure
                vertices.append(pnt)
        polyline_out = arcpy.Polyline(arcpy.Array(vertices),
                sr, False, True)

    return polyline_out
‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍

It basically creates the flipped line and then measures each point on the new line and adjusts the M value and creates the polyline.

0 Kudos