How to: Convert export_v2 JSON to COCO format

Hi to all community members!

Labelbox offers powerful tools for data labeling, including the export_v2 format which provides comprehensive annotations for your images. However, if you’re aiming to integrate your labeled data with other frameworks or tools, you might find yourself needing to convert your Labelbox export_v2 format to COCO format.

The COCO (Common Objects in Context) format is widely used in the computer vision community and is supported by various deep learning frameworks. Converting your Labelbox export_v2 to COCO format can enable seamless integration with these frameworks and tools.

Here’s a step-by-step guide on importing a datarow, sending it to a project, labeling it and then converting the labels to a COCO format (If you already have a project and annotations ready to export jump ahead past step #6):

  1. We are first going to setup a project with some basic bounding box annotations with the following code:
import uuid
from PIL import Image
import requests
import base64
import labelbox as lb
import labelbox.types as lb_types
from io import BytesIO

api_key =""
client = lb.Client(api_key)

# Python Bounding_Box Annotation
bbox_annotation = lb_types.ObjectAnnotation(
    name="bounding_box",  # must match your ontology feature"s name
    value=lb_types.Rectangle(
        start=lb_types.Point(x=1690, y=977),  #  x = left, y = top
        end=lb_types.Point(x=1915, y=1307),  # x= left + width , y = top + height
    ))

global_key = "2560px-Kitano_Street_Kobe01s5s4110.jpeg"

test_img_url = {
    "row_data":
        "https://storage.googleapis.com/labelbox-datasets/image_sample_data/2560px-Kitano_Street_Kobe01s5s4110.jpeg",
    "global_key":
        global_key
}

dataset = client.create_dataset(name="coco-demo-converter")
task = dataset.create_data_rows([test_img_url])
task.wait_till_done()

print(f"Failed data rows: {task.failed_data_rows}")
print(f"Errors: {task.errors}")

if task.errors:
    for error in task.errors:
        if 'Duplicate global key' in error['message'] and dataset.row_count == 0:
            # If the global key already  exists in the workspace the dataset will be created empty, so we can delete it.
            print(f"Deleting empty dataset: {dataset}")
            dataset.delete()
  1. Then we make the Ontology: In this case we will just include a bounding box
object_features = [
    lb.Tool(
        tool=lb.Tool.Type.BBOX,
        name="bounding_box",
        color="#ff0000",
    )
]

ontology_builder = lb.OntologyBuilder(
    tools=object_features,
)

ontology = client.create_ontology("coco-demo-ontology",
                                  ontology_builder.asdict(),
                                  media_type=lb.MediaType.Image
                                  )
  1. We will then setup the project for our new image and bounding box annotations:
project = client.create_project(name="coco-demo-project",
                                media_type=lb.MediaType.Image)

project_id=project.uid
project.setup_editor(ontology)
  1. We will then send a batch to the project:
batch = project.create_batch(
    "coco-demo-batch",  # each batch in a project must have a unique name
    global_keys=[
        global_key
    ],  # paginated collection of data row objects, list of data row ids or global keys
    priority=1  # priority between 1(highest) - 5(lowest)
)

print(f"Batch: {batch}")
  1. Then we will put together the annotations payload to send to our project:
label = []
annotations = [
    bbox_annotation
]

label.append(
    lb_types.Label(data=lb_types.ImageData(global_key=global_key),
                   annotations=annotations))
  1. We then upload our bounding box annotations to the project:
# upload labels for this data row in project
upload_job = lb.MALPredictionImport.create_from_objects(
    client=client,
    project_id=project_id,
    name="coco-demo-mal_job" + str(uuid.uuid4()),
    predictions=label
)
upload_job.wait_until_done()

print(f"Errors: {upload_job.errors}")
print(f"Status of uploads: {upload_job.statuses}")

*** If you already have a project with labels that you want to convert to COCO format you can start here!)

At this point we should now have a fully defined project with annotations and an example image to now export and convert into COCO format

Below you will find a fully functioning script that you could use in a notebook or your own python file. Once you have decided that, you can then input your api_key and project_id (you’ll find the code at the bottom) and have your newly converted coco annotation file!

from labelbox import Client
import urllib.request
import argparse
import copy
import json
import datetime
import requests
from PIL import Image 
import numpy as np
from io import BytesIO
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from shapely.geometry import Polygon
import cv2

def index_ontology(ontology_normalized, export_type="index"):
  """ Given an ontology, returns a dictionary where {key=featureSchemaid : values = {"name", "color", "type", "kind", "parent_featureSchemaIds", "encoded_value"} for each feature in the ontology
  Args:
    ontology_normalized   :   Queried from a project using project.ontology().normalized
  Returns:
    Dictionary with key information on each node in an ontology, where {key=featureSchemaid : values = {"name", "color", "type", "kind", "parent_featureSchemaIds", "encoded_value"}
  """
  feature_map = {}
  tools = ontology_normalized["tools"]
  classifications = ontology_normalized["classifications"]
  if tools:
        results = layer_iterator(feature_map=feature_map, node_layer=tools)
        feature_map = results[0]
  if classifications:
        feature_map = layer_iterator(feature_map=feature_map, node_layer=classifications, encoded_value=results[2], parent_featureSchemaIds=[], parent_featureSchemaId = False)[0]
  return feature_map

def layer_iterator(feature_map, node_layer, encoded_value=0, parent_featureSchemaIds=[], parent_featureSchemaId=False):
    """ Receives a normalized ontology layer (list of dictionaries) and for each dictionary (node), pulls key information where they key=featureSchemaid
        Then if a given node has another layer, recursively call this function to loop through all the nested layers of the ontoology node dictionary
    Args:
        feature_map (dict)              :   Building dictinoary where key=featureSchemaid
        node_layer (list)               :   List of ontology node dictionaries to loop through
        encoded_value (int)             :   Each dictionary gets an encoded value, and this increases by one each ontology node dictionary read into the feature_map
        parent_featureSchemaIds (list)  :   For a given ontology node dictionary, a list of parent featureSchemaid strings
        parent_featureSchemaId (str)    :   The immediate parent ontology node dictionary featureSchemaid
    Returns:
        The same input arguments, only with updated values for feature_map and encoded_value
    """
    if parent_featureSchemaId:
        parent_featureSchemaIds.append(parent_featureSchemaId)
    parent_featureSchemaId = ""
    for node in node_layer:
        encoded_value += 1
        color = ""
        if "tool" in node.keys():
            node_type = node["tool"]
            node_kind = "tool"
            node_name = node["name"]
            next_layer = node["classifications"]
            color = node['color']
        elif "instructions" in node.keys():
            node_name = node["instructions"]
            node_kind = "classification"
            node_type = node["type"]
            next_layer = node["options"]
        else:
            node_name = node["label"]
            node_kind = "option"
            if "options" in node.keys():
                next_layer = node["options"]
                node_type = "branch_option"
            else:
                next_layer = []
                node_type = "leaf_option"
        node_dict = { node['featureSchemaId'] : {"name" : node_name, "color" : color, "type" : node_type, "kind" : node_kind, "parent_featureSchemaIds" : parent_featureSchemaIds, "encoded_value" : encoded_value}}
        feature_map.update(node_dict)
        if next_layer:
            feature_map, next_layer, encoded_value, parent_featureSchemaIds, parent_featureSchemaId = layer_iterator(
                feature_map=feature_map, 
                node_layer=next_layer, 
                encoded_value=encoded_value, 
                parent_featureSchemaIds=parent_featureSchemaIds, 
                parent_featureSchemaId=node['featureSchemaId']
                )
        parent_featureSchemaIds = parent_featureSchemaIds[:-1]
    return feature_map, next_layer, encoded_value, parent_featureSchemaIds, parent_featureSchemaId

def coco_bbox_converter(data_row_id, annotation, category_id): # was data_row_idx
    """ Given a label dictionary and a bounding box annotation from said label, will return the coco-converted bounding box annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    coco_annotation = {
        "image_id": data_row_id,
        "bbox": [
            str(annotation['bounding_box']['top']),
            str(annotation['bounding_box']['left']),
            str(annotation['bounding_box']['height']),
            str(annotation['bounding_box']['width'])
        ],
        "category_id": str(category_id),
        "id": annotation['feature_id']
    }
    return coco_annotation

def coco_line_converter(data_row_id, annotation, category_id):
    """ Given a label dictionary and a line annotation from said label, will return the coco-converted line annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    line = annotation['line']
    coco_line = []
    num_line_keypoints = 0
    for coordinates in line:
        coco_line.append(str(coordinates['x']))
        coco_line.append(str(coordinates['y']))
        coco_line.append("2")
        num_line_keypoints += 1
    coco_annotation = {
        "image_id": str(data_row_id),
        "keypoints": coco_line,
        "num_keypoints": str(num_line_keypoints),
        "category_id" : str(category_id),
        "id": str(annotation['feature_id'])
    }
    return coco_annotation, num_line_keypoints

def coco_point_converter(data_row_id, annotation, category_id):
    """ Given a label dictionary and a point annotation from said label, will return the coco-converted point annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    coco_annotation = {
        "image_id": str(data_row_id),
        "keypoints": [str(annotation['point']['x']), str(annotation['point']['y']), "2"],
        "num_keypoints": str(1),
        "category_id" : str(category_id),
        "id": str(annotation['feature_id'])
    }
    return coco_annotation

def coco_polygon_converter(data_row_id, annotation, category_id):
    """Given a label dictionary and a point annotation from said label, will return the coco-converted polygon annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    all_points = []
    points_as_coords = []
    for coord in annotation['polygon']:
        points_as_coords.append([coord['x'], coord['y']])
        all_points.append(str(coord['x']))
        all_points.append(str(coord['y']))
    polygon = Polygon(points_as_coords)
    coco_annotation = {
        "image_id" : data_row_id, "segmentation" : all_points,
        "bbox" : [
            str(polygon.bounds[0]), str(polygon.bounds[1]),
            str(polygon.bounds[2]-polygon.bounds[0]),
            str(polygon.bounds[3]-polygon.bounds[1])
        ],
        "area" : str(polygon.area), "id": str(annotation['feature_id']),
        "iscrowd" : "0", "category_id" : str(category_id)
    }
    return coco_annotation

def download_mask(url, headers = None):
    """ Downloads a mask URL
    Args:
        url (dict)       :     URL of a mask
    Returns:
        A 2-D numPy array of said mask
    """
    downloaded = True
    while downloaded:
        # to ensure api limit doesn't throw an error
        requests_per_min = 1500
        interval = 60/ requests_per_min
        time.sleep(interval)
        try:
            payload = requests.get(url, headers = client.headers)
            if payload.status_code == 200:
                pil_image = Image.open(BytesIO(payload.content))

                # Convert the image to grayscale if it's not already
                if pil_image.mode != 'L':
                    pil_image = pil_image.convert('L')

                # Convert the image to a NumPy array
                np_array = np.array(pil_image)
                downloaded = False
        except:
            downloaded = True

    return np_array

def coco_mask_converter(data_row_id, annotation, category_id):
    """Given a label dictionary and a mask annotation from said label, will return the coco-converted segmentation mask annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    contours, _ = cv2.findContours(download_mask(annotation['mask']['url']), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    all_points = []
    points_as_coords = []
    for contour in contours:
        contour = contour.flatten().tolist()
        if len(contour) >= 6:
            for i in range(0, len(contour), 2):
                points_as_coords.append([contour[i], contour[i+1]])
                all_points.append(str(contour[i]))
                all_points.append(str(contour[i+1]))
    polygon = Polygon(points_as_coords)
    coco_annotation = {
        "image_id" : data_row_id, "segmentation" : all_points,
        "bbox" : [
            str(polygon.bounds[0]), str(polygon.bounds[1]),
            str(polygon.bounds[2]-polygon.bounds[0]),
            str(polygon.bounds[3]-polygon.bounds[1])
        ],
        "area" : str(polygon.area), "id": str(annotation['feature_id']),
        "iscrowd" : "0", "category_id" : str(category_id)
    }
    return coco_annotation

def coco_annotation_converter(data_row_id, annotation, ontology_index):
    """ Wrapper to triage and multithread the coco annotation conversion - if nested classes exist, the category_id will be the first radio/checklist classification answer available
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label["projects"][project_id]['labels']['annotations']['objects'], which comes from project.export_labels()
        ontology_index (dict)           :     A dictionary where {key=featureSchemaId : value = {"encoded_value"} which corresponds to category_id
    Returns:
        A dictionary corresponding to the coco annotation syntax - the category ID used will be the top-level tool
    """
    max_line_keypoints = 0
    category_id = ontology_index[annotation['feature_schema_id']]['encoded_value']
    if "classifications" in annotation.keys():
        if annotation['classifications']:
            for classification in annotation['classifications']:
                if 'answer' in classification.keys():
                    if type(classification['answer']) == dict:
                        category_id = ontology_index[classification['schemaId']]['encoded_value']
                        break
                else:
                    category_id = ontology_index[classification['answers'][0]['schemaId']]['encoded_value']
                    break
    if "bounding_box" in annotation.keys():
        coco_annotation = coco_bbox_converter(data_row_id, annotation, category_id)
    elif "line" in annotation.keys():
        coco_annotation, max_line_keypoints = coco_line_converter(data_row_id, annotation, category_id)
    elif "point" in annotation.keys():
        coco_annotation = coco_point_converter(data_row_id, annotation, category_id)
    elif "polygon" in annotation.keys():
        coco_annotation = coco_polygon_converter(data_row_id, annotation, category_id)
    else:
        coco_annotation = coco_mask_converter(data_row_id, annotation, category_id)
    return coco_annotation, max_line_keypoints

def coco_converter(project, project_id):
    """ Given a project and a list of labels, will create the COCO export json
    Args:
        project (labelbox.schema.project.Project)   :   Labelbox project object
    Returns:
    """

    export_params= {
        "label_details": True,
        }

    filters= {
        "last_activity_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"],
        "label_created_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"],
        "workflow_status": "Done"
        }

    labels_list = project.export_v2(params=export_params, filters=filters)
    labels_list.wait_till_done()
    labels_list = labels_list.result

    # Info section generated from project information
    info = {
        'description' : project.name,
        'url' : f'https://app.labelbox.com/projects/{project.uid}/overview',
        'version' : "1.0",  'year' : datetime.datetime.now().year,
        'contributor' : project.created_by().email,
        'date_created' : datetime.datetime.now().strftime('%Y/%m/%d'),
    }
    # Licenses section is left empty

    licenses = [ { "url" : "N/A", "id" : 1, "name" : "N/A" } ]

    # Create a dictionary where {key=data_row_id : value=data_row}

    # subsets = list(project.batches()) if len(list(project.batches())) > 0 else list(project.datasets())
    # for subset in subsets:
    #     for data_row in subset.export_data_rows():
    #         data_rows.update({data_row.uid : data_row})

    data_rows = {}
    print(f'Exporting Data Rows from Project...')
    for label in labels_list:
        data_row = label["data_row"]
        data_rows.update({data_row["id"] : data_row["row_data"]})
    print(f'\nExport complete. {len(data_rows)} Data Rows Exported')
    # Images section generated from data row export
    print(f'\nConverting Data Rows into a COCO Dataset...\n')

    images = []
    data_row_check = [] # This is a check for projects where one data row has multiple labels (consensus, benchmark)
    for label in tqdm(labels_list):
        data_row = label["data_row"]
        if data_row['id'] not in data_row_check:
            data_row_check.append(data_row['id'])
            images.append({
                "license" : 1, "file_name" : data_row["global_key"],
                "height" : label["media_attributes"]['height'], 
                "width" : label["media_attributes"]['width'],
                "date_captured" : label["projects"][project_id]["labels"][0]["label_details"]["created_at"], # data_row.created_at.strftime('%Y-%m-%d %H:%M:%S'),
                "id" : data_row["id"], "coco_url": data_row["row_data"]
            })
    print(f'\nData Rows Converted into a COCO Dataset.')  

    annotations = []

    print(f'\nConverting Annotations into the COCO Format...\n')
    ontology_index = index_ontology(project.ontology().normalized) 
    global_max_keypoints = 0
    futures = []
    with ThreadPoolExecutor() as exc:
        for label in labels_list:
            idx = 0
            for annotation in label["projects"][project_id]['labels'][idx]['annotations']['objects']:
                futures.append(exc.submit(coco_annotation_converter, label["data_row"]['id'], annotation, ontology_index))
            idx += 1
        for f in tqdm(as_completed(futures)):
            res = f.result()
            if int(res[1]) > global_max_keypoints:
                global_max_keypoints = int(copy.deepcopy(res[1]))
            annotations.append(res[0])
    print(f'\nAnnotation Conversion Complete. Converted {len(annotations)} annotations into the COCO Format.') 

    categories = []

    print(f'\nConverting the Ontology into the COCO Dataset Format...') 
    for featureSchemaId in ontology_index:
        if ontology_index[featureSchemaId]["type"] == "line": 
            keypoints = []
            skeleton = []
            for i in range(0, global_max_keypoints): 
                keypoints.append(str("line_")+str(i+1))
                skeleton.append([str(i), str(i+1)])
            categories.append({
                "supercategory" : ontology_index[featureSchemaId]['name'],
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name'],
                "keypoints" : keypoints,
                "skeleton" : skeleton,
            })
        elif ontology_index[featureSchemaId]["type"] == "point": 
            categories.append({
                "supercategory" : ontology_index[featureSchemaId]['name'],
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name'],
                "keypoints" : ['point'],
                "skeleton" : ["0", "0"],
            })        
        elif ontology_index[featureSchemaId]['kind'] == 'tool':
            categories.append({
                "supercategory" : ontology_index[featureSchemaId]['name'],
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name']
            })     
        elif len(ontology_index[featureSchemaId]['parent_featureSchemaIds']) == 2:
            supercategory = ontology_index[ontology_index[featureSchemaId]['parent_featureSchemaIds'][0]]['name']
            categories.append({
                "supercategory" : supercategory,
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name']
            })
    print(f'\nOntology Conversion Complete')  

    coco_dataset = {
        "info" : info,
        "licenses" : licenses,
        "images" : images,
        "annotations" : annotations,
        "categories" : categories
    }      

    print(f'\nCOCO Conversion Complete')    
    return coco_dataset

You will then need to add your api_key and project_id, then run the coco_converter() function. Feel free to add this code to the above script at the end

api_key = ""
project_id = ""

coco_dataset = coco_converter(Client(api_key).get_project(project_id), project_id)
print(json.dumps(coco_dataset,indent=4))

This will continually be updated with current methods and exports that Labelbox is using. We will also have an update to this post to include a link to a Colab notebook that will walk you through each method of the script. (UPDATE: Link to Colab notebook)

Overall converting your Labelbox export_v2 data to COCO format opens up a world of possibilities for integrating your labeled data with various frameworks and tools.

We hope this guide proves helpful in your data labeling journey. If you have any questions or need further assistance, don’t hesitate to reach out to our support team. Happy labeling!

Best regards,
Mina Ebeid

2 Likes

Thank you for the script Mebeid!
The team and I at Picknik Robotics have also been running into this same problem where we need to convert a v2 exported data set to COCO format.

Here is an updated form of mebeid’s code that pulls the data from a model_run instead of a project, however we are still using the Project ID for a few information gathering purposes (this could be changed as one sees fit – this is just to provide another starting point).

from labelbox import Client
import urllib.request
import argparse
import copy
import json
import datetime
import requests
from PIL import Image 
import numpy as np
from io import BytesIO
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from shapely.geometry import Polygon
import cv2

def index_ontology(ontology_normalized, export_type="index"):
  """ Given an ontology, returns a dictionary where {key=featureSchemaid : values = {"name", "color", "type", "kind", "parent_featureSchemaIds", "encoded_value"} for each feature in the ontology
  Args:
    ontology_normalized   :   Queried from a project using project.ontology().normalized
  Returns:
    Dictionary with key information on each node in an ontology, where {key=featureSchemaid : values = {"name", "color", "type", "kind", "parent_featureSchemaIds", "encoded_value"}
  """
  feature_map = {}
  tools = ontology_normalized["tools"]
  classifications = ontology_normalized["classifications"]
  if tools:
        results = layer_iterator(feature_map=feature_map, node_layer=tools)
        feature_map = results[0]
  if classifications:
        feature_map = layer_iterator(feature_map=feature_map, node_layer=classifications, encoded_value=results[2], parent_featureSchemaIds=[], parent_featureSchemaId = False)[0]
  return feature_map

def layer_iterator(feature_map, node_layer, encoded_value=0, parent_featureSchemaIds=[], parent_featureSchemaId=False):
    """ Receives a normalized ontology layer (list of dictionaries) and for each dictionary (node), pulls key information where they key=featureSchemaid
        Then if a given node has another layer, recursively call this function to loop through all the nested layers of the ontoology node dictionary
    Args:
        feature_map (dict)              :   Building dictinoary where key=featureSchemaid
        node_layer (list)               :   List of ontology node dictionaries to loop through
        encoded_value (int)             :   Each dictionary gets an encoded value, and this increases by one each ontology node dictionary read into the feature_map
        parent_featureSchemaIds (list)  :   For a given ontology node dictionary, a list of parent featureSchemaid strings
        parent_featureSchemaId (str)    :   The immediate parent ontology node dictionary featureSchemaid
    Returns:
        The same input arguments, only with updated values for feature_map and encoded_value
    """
    if parent_featureSchemaId:
        parent_featureSchemaIds.append(parent_featureSchemaId)
    parent_featureSchemaId = ""
    for node in node_layer:
        encoded_value += 1
        color = ""
        if "tool" in node.keys():
            node_type = node["tool"]
            node_kind = "tool"
            node_name = node["name"]
            next_layer = node["classifications"]
            color = node['color']
        elif "instructions" in node.keys():
            node_name = node["instructions"]
            node_kind = "classification"
            node_type = node["type"]
            next_layer = node["options"]
        else:
            node_name = node["label"]
            node_kind = "option"
            if "options" in node.keys():
                next_layer = node["options"]
                node_type = "branch_option"
            else:
                next_layer = []
                node_type = "leaf_option"
        node_dict = { node['featureSchemaId'] : {"name" : node_name, "color" : color, "type" : node_type, "kind" : node_kind, "parent_featureSchemaIds" : parent_featureSchemaIds, "encoded_value" : encoded_value}}
        feature_map.update(node_dict)
        if next_layer:
            feature_map, next_layer, encoded_value, parent_featureSchemaIds, parent_featureSchemaId = layer_iterator(
                feature_map=feature_map, 
                node_layer=next_layer, 
                encoded_value=encoded_value, 
                parent_featureSchemaIds=parent_featureSchemaIds, 
                parent_featureSchemaId=node['featureSchemaId']
                )
        parent_featureSchemaIds = parent_featureSchemaIds[:-1]
    return feature_map, next_layer, encoded_value, parent_featureSchemaIds, parent_featureSchemaId

def coco_bbox_converter(data_row_id, annotation, category_id): # was data_row_idx
    """ Given a label dictionary and a bounding box annotation from said label, will return the coco-converted bounding box annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    coco_annotation = {
        "image_id": data_row_id,
        "bbox": [
            str(annotation['bounding_box']['top']),
            str(annotation['bounding_box']['left']),
            str(annotation['bounding_box']['height']),
            str(annotation['bounding_box']['width'])
        ],
        "category_id": str(category_id),
        "id": annotation['feature_id']
    }
    return coco_annotation

def coco_line_converter(data_row_id, annotation, category_id):
    """ Given a label dictionary and a line annotation from said label, will return the coco-converted line annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    line = annotation['line']
    coco_line = []
    num_line_keypoints = 0
    for coordinates in line:
        coco_line.append(str(coordinates['x']))
        coco_line.append(str(coordinates['y']))
        coco_line.append("2")
        num_line_keypoints += 1
    coco_annotation = {
        "image_id": str(data_row_id),
        "keypoints": coco_line,
        "num_keypoints": str(num_line_keypoints),
        "category_id" : str(category_id),
        "id": str(annotation['feature_id'])
    }
    return coco_annotation, num_line_keypoints

def coco_point_converter(data_row_id, annotation, category_id):
    """ Given a label dictionary and a point annotation from said label, will return the coco-converted point annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    coco_annotation = {
        "image_id": str(data_row_id),
        "keypoints": [str(annotation['point']['x']), str(annotation['point']['y']), "2"],
        "num_keypoints": str(1),
        "category_id" : str(category_id),
        "id": str(annotation['feature_id'])
    }
    return coco_annotation

def coco_polygon_converter(data_row_id, annotation, category_id):
    """Given a label dictionary and a point annotation from said label, will return the coco-converted polygon annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    all_points = []
    points_as_coords = []
    for coord in annotation['polygon']:
        points_as_coords.append([coord['x'], coord['y']])
        all_points.append(str(coord['x']))
        all_points.append(str(coord['y']))
    polygon = Polygon(points_as_coords)
    coco_annotation = {
        "image_id" : data_row_id, "segmentation" : all_points,
        "bbox" : [
            str(polygon.bounds[0]), str(polygon.bounds[1]),
            str(polygon.bounds[2]-polygon.bounds[0]),
            str(polygon.bounds[3]-polygon.bounds[1])
        ],
        "area" : str(polygon.area), "id": str(annotation['feature_id']),
        "iscrowd" : "0", "category_id" : str(category_id)
    }
    return coco_annotation

def download_mask(url, headers = None):
    """ Downloads a mask URL
    Args:
        url (dict)       :     URL of a mask
    Returns:
        A 2-D numPy array of said mask
    """
    print(f"downloading_mask at: {url}")
    payload = requests.get(url, headers = client.headers)
    # Check that response is not 500
    print("Payload: ", payload)
    pil_image = Image.open(BytesIO(payload.content))

    # Convert the image to grayscale if it's not already
    if pil_image.mode != 'L':
        pil_image = pil_image.convert('L')

    # Convert the image to a NumPy array
    np_array = np.array(pil_image)
    downloaded = True

    return np_array

def coco_mask_converter(data_row_id, annotation, category_id):
    """Given a label dictionary and a mask annotation from said label, will return the coco-converted segmentation mask annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    contours, _ = cv2.findContours(download_mask(annotation['mask']['url']), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    all_points = []
    points_as_coords = []
    for contour in contours:
        contour = contour.flatten().tolist()
        if len(contour) >= 6:
            for i in range(0, len(contour), 2):
                points_as_coords.append([contour[i], contour[i+1]])
                all_points.append(str(contour[i]))
                all_points.append(str(contour[i+1]))
    polygon = Polygon(points_as_coords)
    coco_annotation = {
        "image_id" : data_row_id, "segmentation" : all_points,
        "bbox" : [
            str(polygon.bounds[0]), str(polygon.bounds[1]),
            str(polygon.bounds[2]-polygon.bounds[0]),
            str(polygon.bounds[3]-polygon.bounds[1])
        ],
        "area" : str(polygon.area), "id": str(annotation['feature_id']),
        "iscrowd" : "0", "category_id" : str(category_id)
    }
    return coco_annotation

def coco_annotation_converter(data_row_id, annotation, ontology_index):
    """ Wrapper to triage and multithread the coco annotation conversion - if nested classes exist, the category_id will be the first radio/checklist classification answer available
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label["projects"][project_id]['labels']['annotations']['objects'], which comes from project.export_labels()
        ontology_index (dict)           :     A dictionary where {key=featureSchemaId : value = {"encoded_value"} which corresponds to category_id
    Returns:
        A dictionary corresponding to the coco annotation syntax - the category ID used will be the top-level tool
    """
    max_line_keypoints = 0
    category_id = ontology_index[annotation['feature_schema_id']]['encoded_value']
    if "classifications" in annotation.keys():
        if annotation['classifications']:
            for classification in annotation['classifications']:
                if 'answer' in classification.keys():
                    if type(classification['answer']) == dict:
                        category_id = ontology_index[classification['schemaId']]['encoded_value']
                        break
                else:
                    category_id = ontology_index[classification['answers'][0]['schemaId']]['encoded_value']
                    break
    if "bounding_box" in annotation.keys():
        coco_annotation = coco_bbox_converter(data_row_id, annotation, category_id)
    elif "line" in annotation.keys():
        coco_annotation, max_line_keypoints = coco_line_converter(data_row_id, annotation, category_id)
    elif "point" in annotation.keys():
        coco_annotation = coco_point_converter(data_row_id, annotation, category_id)
    elif "polygon" in annotation.keys():
        coco_annotation = coco_polygon_converter(data_row_id, annotation, category_id)
    else:
        coco_annotation = coco_mask_converter(data_row_id, annotation, category_id)
    return coco_annotation, max_line_keypoints

def coco_converter(labelbox_api_key, run_id, project_id):
    """ Given a project ID, a Model run ID, and a list of labels, will create the COCO export json
    Args:
    Returns:
    """
    #run_id = "a4436358-9baa-0a7c-e557-8c642f346d24"
    model_run = client.get_model_run(run_id)
    project = client.get_project(project_id)

    export_params= {
        "label_details": True,
        "data_row_details": True,
        "model_run_details": True,
        "metadata_fields": True
        }

    #filters= {
    #    "last_activity_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"],
    #    "label_created_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"],
    #    "workflow_status": "Done"
    #    }

    #labels_list = model_run.export_v2(params=export_params, filters=filters)
    labels_list = model_run.export_v2(params=export_params)
    labels_list.wait_till_done()
    labels_list = labels_list.result

    #print("LABELS LIST: \n", labels_list)

    # Info section generated from project information
    info = {
        'description' : project.name,
        'url' : f'https://app.labelbox.com/projects/{project.uid}/overview',
        'version' : "1.0",  'year' : datetime.datetime.now().year,
        'contributor' : project.created_by().email,
        'date_created' : datetime.datetime.now().strftime('%Y/%m/%d'),
    }
    # Licenses section is left empty

    licenses = [ { "url" : "N/A", "id" : 1, "name" : "N/A" } ]

    # Create a dictionary where {key=data_row_id : value=data_row}
    print(f'Exporting Data Rows from Project...')
    data_rows = {}
    # Also create a dict for the experiments with the same data row id.
    experiments = {}
    for label in labels_list:
        row_id = label["data_row"]["id"]
        data_rows[row_id] = label["data_row"]
        # Add experiment runs now
        for experiment_id in label["experiments"]:
            for experiment_run_id in label["experiments"][experiment_id]["runs"]:
                # This code only assumes one experiment run (per data row per model run). Append to a list if you want full
                experiments[row_id] = label["experiments"][experiment_id]["runs"][experiment_run_id]
                # This will have [name, run_data_row_id, labels{}, predictions, split]

    print(f'\nExport complete. {len(data_rows)} Data Rows Exported')

    # Images section generated from data row export
    print(f'\nConverting Data Rows into a COCO Dataset...\n')

    #print("Data rows: \n", data_rows)

    images = []
    #for label in tqdm(labels_list):
    for data_row_id in data_rows:
        data_row = data_rows[data_row_id]
        print(f"Data row ID {data_row_id}: {data_row}")
        images.append({
            "license" : 1,
            "file_name" : data_row["external_id"],
            "height" : label["media_attributes"]['height'], 
            "width" : label["media_attributes"]['width'],
            "date_captured" : data_row["details"]["created_at"], # data_row.created_at.strftime('%Y-%m-%d %H:%M:%S'),
            "id" : data_row["id"], 
            "coco_url": data_row["row_data"]
        })
    print(f'\nData Rows Converted into a COCO Dataset.')  

    annotations = []

    print(f'\nConverting Annotations into the COCO Format...\n')
    ontology_index = index_ontology(project.ontology().normalized) 
    global_max_keypoints = 0
    futures = []
    with ThreadPoolExecutor() as exc:
        #for label in labels_list:
        for data_row_id in data_rows:
            idx = 0
            #for annotation in label["projects"][project_id]['labels'][idx]['annotations']['objects']:
            for annotation in experiments[data_row_id]['labels'][idx]['annotations']['objects']:
                futures.append(exc.submit(coco_annotation_converter, data_row_id, annotation, ontology_index))
            idx += 1
        for f in tqdm(as_completed(futures)):
            res = f.result()
            if int(res[1]) > global_max_keypoints:
                global_max_keypoints = int(copy.deepcopy(res[1]))
            annotations.append(res[0])
    print(f'\nAnnotation Conversion Complete. Converted {len(annotations)} annotations into the COCO Format.') 

    categories = []

    print(f'\nConverting the Ontology into the COCO Dataset Format...') 
    for featureSchemaId in ontology_index:
        if ontology_index[featureSchemaId]["type"] == "line": 
            keypoints = []
            skeleton = []
            for i in range(0, global_max_keypoints): 
                keypoints.append(str("line_")+str(i+1))
                skeleton.append([str(i), str(i+1)])
            categories.append({
                "supercategory" : ontology_index[featureSchemaId]['name'],
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name'],
                "keypoints" : keypoints,
                "skeleton" : skeleton,
            })
        elif ontology_index[featureSchemaId]["type"] == "point": 
            categories.append({
                "supercategory" : ontology_index[featureSchemaId]['name'],
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name'],
                "keypoints" : ['point'],
                "skeleton" : ["0", "0"],
            })        
        elif ontology_index[featureSchemaId]['kind'] == 'tool':
            categories.append({
                "supercategory" : ontology_index[featureSchemaId]['name'],
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name']
            })     
        elif len(ontology_index[featureSchemaId]['parent_featureSchemaIds']) == 2:
            supercategory = ontology_index[ontology_index[featureSchemaId]['parent_featureSchemaIds'][0]]['name']
            categories.append({
                "supercategory" : supercategory,
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name']
            })
    print(f'\nOntology Conversion Complete')  

    coco_dataset = {
        "info" : info,
        "licenses" : licenses,
        "images" : images,
        "annotations" : annotations,
        "categories" : categories
    }      

    print(f'\nCOCO Conversion Complete')    
    return coco_dataset

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--project_id", type=str, required=True, help="the project ID defined in labelbox"
    )
    parser.add_argument(
        "--run_id", type=str, required=True, help="the model run ID defined in labelbox"
    )
    parser.add_argument(
        "--labelbox_api_key", type=str, required=True, help="the API key defined in labelbox"
    )
    args = parser.parse_args()
    client = Client(api_key=args.labelbox_api_key, enable_experimental=True)
    coco_dataset = coco_converter(args.labelbox_api_key, args.run_id, args.project_id)
    print(json.dumps(coco_dataset,indent=4))

However, when we try this on V2 data the mask URLs are not valid. This is the same problem as the V1 data export which is why we were updating to V2 in the first place.
When trying to go to a target mask, such as e.g. https://api.labelbox.com/masks/feature/933a8a74-b64e-0a99-bdd6-7fae9ce39854/1 (probably private, just showing the format)
We get
{"status":"error","message":"The requested mask could not be found."}
Or alternatively Error 500 Internal Server Errors.

Has anyone else encountered this where either V1 or V2 instanceURI’s / annotation label mask URL’s don’t work at all? The links still work for old projects, but none for any new ones.

I currently suspect that Labelbox’s internal API is just partially broken as part of the migration, but would like to see if anyone else gets this too in case we are doing something wrong.

Thank you,
Chance

Hi Chance! Welcome to the community!

Thank you for posting about this issue. This is probably coming from the download_mask() function. I’m updating the original post with an update to that function with the following:

def download_mask(url, headers = None):
    """ Downloads a mask URL
    Args:
        url (dict)       :     URL of a mask
    Returns:
        A 2-D numPy array of said mask
    """
    downloaded = True
    while downloaded:
        # to ensure api limit doesn't throw an error
        requests_per_min = 1500
        interval = 60/ requests_per_min
        time.sleep(interval)
        try:
            payload = requests.get(url, headers = client.headers)
            if payload.status_code == 200:
                pil_image = Image.open(BytesIO(payload.content))

                # Convert the image to grayscale if it's not already
                if pil_image.mode != 'L':
                    pil_image = pil_image.convert('L')

                # Convert the image to a NumPy array
                np_array = np.array(pil_image)
                downloaded = False
        except:
            downloaded = True

    return np_array

Hopefully that fixes the issue, otherwise could you provide a projectID?

I also notice that you mention the link works for old projects, one thing to note is when exporting masks for new projects and exports_v2, the provided link is available only for 30 days and then they will expire, if you need to reaccess the mask, you will need to rerun the export

Best,
Mina

Yes, thank you Mina. I have updated the code following your suggestion, but the problem remains. I am both sure that the Labelbox API key is being correctly passed in to the mask url request (made some refactorings in the code to reflect this), and that the project has been exported <30 days ago. Like I said this bug seems to only effect newer projects, whereas older projects DO work.

The problem is that the requests correctly pulls the URL but the URL returns either “The requested mask could not be found.” or it’s just internal server errors (status code 500). I have updated the code to not flood the labelbox API in case of non 200 return codes.

Updated code:

from labelbox import Client
import urllib.request
import argparse
import copy
import json
import datetime
import requests
import time
from PIL import Image 
import numpy as np
from io import BytesIO
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from shapely.geometry import Polygon
import cv2

def index_ontology(ontology_normalized, export_type="index"):
  """ Given an ontology, returns a dictionary where {key=featureSchemaid : values = {"name", "color", "type", "kind", "parent_featureSchemaIds", "encoded_value"} for each feature in the ontology
  Args:
    ontology_normalized   :   Queried from a project using project.ontology().normalized
  Returns:
    Dictionary with key information on each node in an ontology, where {key=featureSchemaid : values = {"name", "color", "type", "kind", "parent_featureSchemaIds", "encoded_value"}
  """
  feature_map = {}
  tools = ontology_normalized["tools"]
  classifications = ontology_normalized["classifications"]
  if tools:
        results = layer_iterator(feature_map=feature_map, node_layer=tools)
        feature_map = results[0]
  if classifications:
        feature_map = layer_iterator(feature_map=feature_map, node_layer=classifications, encoded_value=results[2], parent_featureSchemaIds=[], parent_featureSchemaId = False)[0]
  return feature_map

def layer_iterator(feature_map, node_layer, encoded_value=0, parent_featureSchemaIds=[], parent_featureSchemaId=False):
    """ Receives a normalized ontology layer (list of dictionaries) and for each dictionary (node), pulls key information where they key=featureSchemaid
        Then if a given node has another layer, recursively call this function to loop through all the nested layers of the ontoology node dictionary
    Args:
        feature_map (dict)              :   Building dictinoary where key=featureSchemaid
        node_layer (list)               :   List of ontology node dictionaries to loop through
        encoded_value (int)             :   Each dictionary gets an encoded value, and this increases by one each ontology node dictionary read into the feature_map
        parent_featureSchemaIds (list)  :   For a given ontology node dictionary, a list of parent featureSchemaid strings
        parent_featureSchemaId (str)    :   The immediate parent ontology node dictionary featureSchemaid
    Returns:
        The same input arguments, only with updated values for feature_map and encoded_value
    """
    if parent_featureSchemaId:
        parent_featureSchemaIds.append(parent_featureSchemaId)
    parent_featureSchemaId = ""
    for node in node_layer:
        encoded_value += 1
        color = ""
        if "tool" in node.keys():
            node_type = node["tool"]
            node_kind = "tool"
            node_name = node["name"]
            next_layer = node["classifications"]
            color = node['color']
        elif "instructions" in node.keys():
            node_name = node["instructions"]
            node_kind = "classification"
            node_type = node["type"]
            next_layer = node["options"]
        else:
            node_name = node["label"]
            node_kind = "option"
            if "options" in node.keys():
                next_layer = node["options"]
                node_type = "branch_option"
            else:
                next_layer = []
                node_type = "leaf_option"
        node_dict = { node['featureSchemaId'] : {"name" : node_name, "color" : color, "type" : node_type, "kind" : node_kind, "parent_featureSchemaIds" : parent_featureSchemaIds, "encoded_value" : encoded_value}}
        feature_map.update(node_dict)
        if next_layer:
            feature_map, next_layer, encoded_value, parent_featureSchemaIds, parent_featureSchemaId = layer_iterator(
                feature_map=feature_map, 
                node_layer=next_layer, 
                encoded_value=encoded_value, 
                parent_featureSchemaIds=parent_featureSchemaIds, 
                parent_featureSchemaId=node['featureSchemaId']
                )
        parent_featureSchemaIds = parent_featureSchemaIds[:-1]
    return feature_map, next_layer, encoded_value, parent_featureSchemaIds, parent_featureSchemaId

def coco_bbox_converter(data_row_id, annotation, category_id): # was data_row_idx
    """ Given a label dictionary and a bounding box annotation from said label, will return the coco-converted bounding box annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    coco_annotation = {
        "image_id": data_row_id,
        "bbox": [
            str(annotation['bounding_box']['top']),
            str(annotation['bounding_box']['left']),
            str(annotation['bounding_box']['height']),
            str(annotation['bounding_box']['width'])
        ],
        "category_id": str(category_id),
        "id": annotation['feature_id']
    }
    return coco_annotation

def coco_line_converter(data_row_id, annotation, category_id):
    """ Given a label dictionary and a line annotation from said label, will return the coco-converted line annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    line = annotation['line']
    coco_line = []
    num_line_keypoints = 0
    for coordinates in line:
        coco_line.append(str(coordinates['x']))
        coco_line.append(str(coordinates['y']))
        coco_line.append("2")
        num_line_keypoints += 1
    coco_annotation = {
        "image_id": str(data_row_id),
        "keypoints": coco_line,
        "num_keypoints": str(num_line_keypoints),
        "category_id" : str(category_id),
        "id": str(annotation['feature_id'])
    }
    return coco_annotation, num_line_keypoints

def coco_point_converter(data_row_id, annotation, category_id):
    """ Given a label dictionary and a point annotation from said label, will return the coco-converted point annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    coco_annotation = {
        "image_id": str(data_row_id),
        "keypoints": [str(annotation['point']['x']), str(annotation['point']['y']), "2"],
        "num_keypoints": str(1),
        "category_id" : str(category_id),
        "id": str(annotation['feature_id'])
    }
    return coco_annotation

def coco_polygon_converter(data_row_id, annotation, category_id):
    """Given a label dictionary and a point annotation from said label, will return the coco-converted polygon annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    all_points = []
    points_as_coords = []
    for coord in annotation['polygon']:
        points_as_coords.append([coord['x'], coord['y']])
        all_points.append(str(coord['x']))
        all_points.append(str(coord['y']))
    polygon = Polygon(points_as_coords)
    coco_annotation = {
        "image_id" : data_row_id, "segmentation" : all_points,
        "bbox" : [
            str(polygon.bounds[0]), str(polygon.bounds[1]),
            str(polygon.bounds[2]-polygon.bounds[0]),
            str(polygon.bounds[3]-polygon.bounds[1])
        ],
        "area" : str(polygon.area), "id": str(annotation['feature_id']),
        "iscrowd" : "0", "category_id" : str(category_id)
    }
    return coco_annotation

def download_mask(client, url):
    """ Downloads a mask URL
    Args:
        url (dict)       :     URL of a mask
    Returns:
        A 2-D numPy array of said mask
    """
    print(f"Downloading mask URL at: {url}")
    downloaded = False
    while not downloaded:
        # to ensure api limit doesn't throw an error
        requests_per_min = 1500
        interval = 60/ requests_per_min
        time.sleep(interval)
        try:
            payload = requests.get(url, headers = client.headers)
            if payload.status_code == 200:
                pil_image = Image.open(BytesIO(payload.content))

                # Convert the image to grayscale if it's not already
                if pil_image.mode != 'L':
                    pil_image = pil_image.convert('L')

                # Convert the image to a NumPy array
                np_array = np.array(pil_image)
                downloaded = True
            if payload.status_code != 200:
                print(f"Error downloading mask at url: {url}, status code {payload.status_code}")
                break
        except:
            downloaded = False
            print(f"Error downloading mask at url: {url}")

    return np_array


def coco_mask_converter(client, data_row_id, annotation, category_id):
    """Given a label dictionary and a mask annotation from said label, will return the coco-converted segmentation mask annotation dictionary
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label['Label']['objects'], which comes from project.export_labels()
        category_id (str)               :     Desired category_id for the coco_annotation
    Returns:
        An annotation dictionary in the COCO format
    """
    contours, _ = cv2.findContours(download_mask(client, annotation['mask']['url']), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    all_points = []
    points_as_coords = []
    for contour in contours:
        contour = contour.flatten().tolist()
        if len(contour) >= 6:
            for i in range(0, len(contour), 2):
                points_as_coords.append([contour[i], contour[i+1]])
                all_points.append(str(contour[i]))
                all_points.append(str(contour[i+1]))
    polygon = Polygon(points_as_coords)
    coco_annotation = {
        "image_id" : data_row_id, "segmentation" : all_points,
        "bbox" : [
            str(polygon.bounds[0]), str(polygon.bounds[1]),
            str(polygon.bounds[2]-polygon.bounds[0]),
            str(polygon.bounds[3]-polygon.bounds[1])
        ],
        "area" : str(polygon.area), "id": str(annotation['feature_id']),
        "iscrowd" : "0", "category_id" : str(category_id)
    }
    return coco_annotation

def coco_annotation_converter(data_row_id, annotation, ontology_index):
    """ Wrapper to triage and multithread the coco annotation conversion - if nested classes exist, the category_id will be the first radio/checklist classification answer available
    Args:
        data_row_id (str)               :     Labelbox Data Row ID for this label
        annotation (dict)               :     Annotation dictionary from label["projects"][project_id]['labels']['annotations']['objects'], which comes from project.export_labels()
        ontology_index (dict)           :     A dictionary where {key=featureSchemaId : value = {"encoded_value"} which corresponds to category_id
    Returns:
        A dictionary corresponding to the coco annotation syntax - the category ID used will be the top-level tool
    """
    max_line_keypoints = 0
    category_id = ontology_index[annotation['feature_schema_id']]['encoded_value']
    if "classifications" in annotation.keys():
        if annotation['classifications']:
            for classification in annotation['classifications']:
                if 'answer' in classification.keys():
                    if type(classification['answer']) == dict:
                        category_id = ontology_index[classification['schemaId']]['encoded_value']
                        break
                else:
                    category_id = ontology_index[classification['answers'][0]['schemaId']]['encoded_value']
                    break
    if "bounding_box" in annotation.keys():
        coco_annotation = coco_bbox_converter(data_row_id, annotation, category_id)
    elif "line" in annotation.keys():
        coco_annotation, max_line_keypoints = coco_line_converter(data_row_id, annotation, category_id)
    elif "point" in annotation.keys():
        coco_annotation = coco_point_converter(data_row_id, annotation, category_id)
    elif "polygon" in annotation.keys():
        coco_annotation = coco_polygon_converter(data_row_id, annotation, category_id)
    else:
        coco_annotation = coco_mask_converter(client, data_row_id, annotation, category_id)
    return coco_annotation, max_line_keypoints

def coco_converter(client, run_id, project_id):
    """ Given a project ID, a Model run ID, and a list of labels, will create the COCO export json
    Args:
    Returns:
    """
    model_run = client.get_model_run(run_id)
    project = client.get_project(project_id)

    export_params= {
        "label_details": True,
        "data_row_details": True,
        "model_run_details": True,
        "metadata_fields": True
        }

    #filters= {
    #    "last_activity_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"],
    #    "label_created_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"],
    #    "workflow_status": "Done"
    #    }

    #labels_list = model_run.export_v2(params=export_params, filters=filters)
    labels_list = model_run.export_v2(params=export_params)
    labels_list.wait_till_done()
    labels_list = labels_list.result

    #print("LABELS LIST: \n", labels_list)

    # Info section generated from project information
    info = {
        'description' : project.name,
        'url' : f'https://app.labelbox.com/projects/{project.uid}/overview',
        'version' : "1.0",  'year' : datetime.datetime.now().year,
        'contributor' : project.created_by().email,
        'date_created' : datetime.datetime.now().strftime('%Y/%m/%d'),
    }
    # Licenses section is left empty

    licenses = [ { "url" : "N/A", "id" : 1, "name" : "N/A" } ]

    # Create a dictionary where {key=data_row_id : value=data_row}
    print(f'Exporting Data Rows from Project...')
    data_rows = {}
    # Also create a dict for the experiments with the same data row id.
    experiments = {}
    for label in labels_list:
        row_id = label["data_row"]["id"]
        data_rows[row_id] = label["data_row"]
        # Add experiment runs now
        for experiment_id in label["experiments"]:
            for experiment_run_id in label["experiments"][experiment_id]["runs"]:
                # This code only assumes one experiment run (per data row per model run). Append to a list if you want full
                experiments[row_id] = label["experiments"][experiment_id]["runs"][experiment_run_id]
                # This will have [name, run_data_row_id, labels{}, predictions, split]
                
    print(f'\nExport complete. {len(data_rows)} Data Rows Exported')

    # Images section generated from data row export
    print(f'\nConverting Data Rows into a COCO Dataset...\n')

    #print("Data rows: \n", data_rows)

    images = []
    #for label in tqdm(labels_list):
    for data_row_id in data_rows:
        data_row = data_rows[data_row_id]
        print(f"Data row ID {data_row_id}: {data_row}")
        images.append({
            "license" : 1,
            "file_name" : data_row["external_id"],
            "height" : label["media_attributes"]['height'], 
            "width" : label["media_attributes"]['width'],
            "date_captured" : data_row["details"]["created_at"], # data_row.created_at.strftime('%Y-%m-%d %H:%M:%S'),
            "id" : data_row["id"], 
            "coco_url": data_row["row_data"]
        })
    print(f'\nData Rows Converted into a COCO Dataset.')  

    annotations = []

    print(f'\nConverting Annotations into the COCO Format...\n')
    ontology_index = index_ontology(project.ontology().normalized) 
    global_max_keypoints = 0
    futures = []
    with ThreadPoolExecutor() as exc:
        #for label in labels_list:
        for data_row_id in data_rows:
            idx = 0
            #for annotation in label["projects"][project_id]['labels'][idx]['annotations']['objects']:
            for annotation in experiments[data_row_id]['labels'][idx]['annotations']['objects']:
                futures.append(exc.submit(coco_annotation_converter, data_row_id, annotation, ontology_index))
            idx += 1
        for f in tqdm(as_completed(futures)):
            res = f.result()
            if int(res[1]) > global_max_keypoints:
                global_max_keypoints = int(copy.deepcopy(res[1]))
            annotations.append(res[0])
    print(f'\nAnnotation Conversion Complete. Converted {len(annotations)} annotations into the COCO Format.') 

    categories = []

    print(f'\nConverting the Ontology into the COCO Dataset Format...') 
    for featureSchemaId in ontology_index:
        if ontology_index[featureSchemaId]["type"] == "line": 
            keypoints = []
            skeleton = []
            for i in range(0, global_max_keypoints): 
                keypoints.append(str("line_")+str(i+1))
                skeleton.append([str(i), str(i+1)])
            categories.append({
                "supercategory" : ontology_index[featureSchemaId]['name'],
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name'],
                "keypoints" : keypoints,
                "skeleton" : skeleton,
            })
        elif ontology_index[featureSchemaId]["type"] == "point": 
            categories.append({
                "supercategory" : ontology_index[featureSchemaId]['name'],
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name'],
                "keypoints" : ['point'],
                "skeleton" : ["0", "0"],
            })        
        elif ontology_index[featureSchemaId]['kind'] == 'tool':
            categories.append({
                "supercategory" : ontology_index[featureSchemaId]['name'],
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name']
            })     
        elif len(ontology_index[featureSchemaId]['parent_featureSchemaIds']) == 2:
            supercategory = ontology_index[ontology_index[featureSchemaId]['parent_featureSchemaIds'][0]]['name']
            categories.append({
                "supercategory" : supercategory,
                "id" : str(ontology_index[featureSchemaId]["encoded_value"]),
                "name" : ontology_index[featureSchemaId]['name']
            })
    print(f'\nOntology Conversion Complete')  

    coco_dataset = {
        "info" : info,
        "licenses" : licenses,
        "images" : images,
        "annotations" : annotations,
        "categories" : categories
    }      

    print(f'\nCOCO Conversion Complete')    
    return coco_dataset

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--project_id", type=str, required=True, help="the project ID defined in labelbox"
    )
    parser.add_argument(
        "--run_id", type=str, required=True, help="the model run ID defined in labelbox"
    )
    parser.add_argument(
        "--labelbox_api_key", type=str, required=True, help="the API key defined in labelbox"
    )
    args = parser.parse_args()
    client = Client(api_key=args.labelbox_api_key, enable_experimental=True)
    coco_dataset = coco_converter(client, args.run_id, args.project_id)
    print(json.dumps(coco_dataset,indent=4))

The Project ID is clvb9bpy401r707yb3rux7ujs in case this is just a project configuration error of some sort, but I’m not sure what it could be (I have checked all settings I can find). In the mean time I’m thinking this is a labelbox-internal side error.

2 Likes