How can I export my annotation to AWS Sagemaker/Comprehend?

I have a project with annotated text files in Labelbox and see the only option to export in the app is as a JSON file. AWS Comprehend requires annotation files used for model training to be .csv files. I can only find one mention of python code that converts the JSON file to .csv in the Labelbox documentation for exporting annotations from a project and I am getting repeated errors using their code.

I’ve tried using some basic JSON to CSV file converters but they don’t get the column and row formatting correct and AWS spits out file errors saying it can’t read the file.

Has anyone found a work around to get annotated files from Labelbox into AWS Comprehend custom entity recognition model training?

Or found what is causing the KeyError: ‘instanceURI’ error when you try to run the Jupyter notebook provided in the Labelbox documentation pages?

Hello,

I’ll look into the InstanceURI key error, what type of data are you annotating in the project you’re trying to export?

Best,
Luke
Labelbox Support

It is annotated text data.

Ah okay, it looks like we need to make some changes to the Labelpandas library to support text annotations. I’ll work on those changes and provide you with the necessary functions to run export_to_table() in the notebook while we update the library with the changes.

Here’s the updated code to export the project to a table, you just need to add in your API key and project ID at the bottom and it should create the dataframe that you can export to a csv. Let me know if you run into any issues with this.

import labelbox as lb
from labelbox import Client as labelboxClient
from labelbox import Project as labelboxProject
from labelbox import Ontology as labelboxOntology
import pandas as pd

def get_ontology_schema_to_name_path(ontology, divider:str="///", invert:bool=False, detailed:bool=False):
    
    def map_layer(feature_dict:dict={}, node_layer:list= [], parent_name_path:str="", divider:str="///", invert:bool=False, detailed:bool=False, encoded_value:int=0):
        if node_layer:
            for node in node_layer:
                encoded_value += 1
                if "tool" in node.keys():
                    node_name = node["name"]
                    next_layer = node["classifications"]
                    node_type = node["tool"]
                    node_type = "bbox" if node_type == "rectangle" else node_type
                    node_type = "mask" if node_type in ["superpixel", "raster-segmentation"] else node_type 
                    node_kind = "tool"   
                elif "instructions" in node.keys():
                    node_name = node["instructions"]
                    next_layer = node["options"]
                    node_kind = "classification"
                    node_type = node["type"]                        
                else:
                    node_type = "option"
                    node_name = node["label"]
                    next_layer = node.get("options", [])
                    node_kind = "branch_option" if next_layer else "leaf_option" 
                name_path = f"{parent_name_path}{divider}{node_name}" if parent_name_path else node_name
                dict_key = node['featureSchemaId'] if not invert else name_path
                if detailed:
                    if not invert:
                        dict_value = {"name":node_name,"type":node_type,"kind":node_kind,"encoded_value":encoded_value,"name_path":name_path}
                    else:
                        dict_value = {"name":node_name,"type":node_type,"kind":node_kind,"encoded_value":encoded_value,"schema_id":node['featureSchemaId']}
                else:
                    dict_value = name_path if not invert else node['featureSchemaId']
                feature_dict.update({dict_key : dict_value})
                if next_layer:
                    feature_dict, encoded_value = map_layer(feature_dict, next_layer, name_path, divider, invert=invert, detailed=detailed, encoded_value=encoded_value)
        return feature_dict, encoded_value
    if type(ontology) == labelboxOntology:
        ontology_normalized = ontology.normalized
    elif type(ontology) == dict:
        ontology_normalized = ontology
    else:
        raise TypeError(f"Input for ontology must be either a Lablbox ontology object or a dictionary representation of a Labelbox ontology - received input of type {ontology}") 
    if ontology_normalized["tools"]:
        working_dictionary, working_encoded_value = map_layer(feature_dict={}, node_layer=ontology_normalized["tools"], divider=divider, invert=invert, detailed=detailed)
    else:
        working_dictionary = {} 
        working_encoded_value = 0
    if ontology_normalized["classifications"]:
        working_dictionary, working_encoded_value = map_layer(feature_dict=working_dictionary, node_layer=ontology_normalized["classifications"], divider=divider, invert=invert, detailed=detailed, encoded_value=working_encoded_value)
    return working_dictionary

def get_metadata_schema_to_type(client:labelboxClient, lb_mdo=False, invert:bool=False):
    metadata_schema_to_type = {}
    lb_mdo = client.get_data_row_metadata_ontology() if not lb_mdo else lb_mdo
    for field in lb_mdo._get_ontology():
        metadata_type = ""
        if "enum" in field["kind"].lower():
            metadata_type = "enum"
        if "string" in field["kind"].lower():
            metadata_type = "string"
        if "datetime" in field["kind"].lower():
            metadata_type = "datetime"
        if "number" in field["kind"].lower():
            metadata_type = "number"
        if metadata_type:
            metadata_schema_to_type[field["id"]] = metadata_type
    return_value = metadata_schema_to_type if not invert else {v:k for k,v in metadata_schema_to_type.items()}
    return return_value

def get_metadata_schema_to_name_key(client:labelboxClient, lb_mdo=False, divider="///", invert:bool=False):
    lb_mdo = client.get_data_row_metadata_ontology() if not lb_mdo else lb_mdo
    lb_metadata_dict = lb_mdo.reserved_by_name
    lb_metadata_dict.update(lb_mdo.custom_by_name)
    metadata_schema_to_name_key = {}
    for metadata_field_name_key in lb_metadata_dict:
        if type(lb_metadata_dict[metadata_field_name_key]) == dict:
            metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name_key][next(iter(lb_metadata_dict[metadata_field_name_key]))].parent] = str(metadata_field_name_key)
            for enum_option in lb_metadata_dict[metadata_field_name_key]:
                metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name_key][enum_option].uid] = f"{str(metadata_field_name_key)}{str(divider)}{str(enum_option)}"
        else:
            metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name_key].uid] = str(metadata_field_name_key)
    return_value = metadata_schema_to_name_key if not invert else {v:k for k,v in metadata_schema_to_name_key.items()}
    return return_value

def get_leaf_paths(export_classifications:list, schema_to_name_path:dict, divider:str="///"):
    def build_leaf_paths(root:dict, acc="", name_paths=[], divider="///"):
        for parent in root.keys():
            name_path = f"{acc}{divider}{parent}" if acc else f"{parent}"
            child = root[parent]
            if child:
                name_paths = build_leaf_paths(root=root[parent], acc=name_path, name_paths=name_paths)
            else:
                name_paths.append(name_path)
        return name_paths    
    name_paths = []
    for cla in export_classifications:
        if type(cla) == dict:
            if "answers" in cla.keys():
                for answer in cla["answers"]:
                    name_paths.append(schema_to_name_path[answer["schemaId"]])
            if "answer" in cla.keys():
                if type(cla["answer"]) == str:
                    name_paths.append(schema_to_name_path[cla["schemaId"]]+divider+cla["answer"])
                else:
                    name_paths.append(schema_to_name_path[cla["answer"]["schemaId"]]) 
        else:
            for c in cla:
                if "answers" in c.keys():
                    for answer in c["answers"]:
                        name_paths.append(schema_to_name_path[answer["schemaId"]])
                if "answer" in c.keys():
                    if type(c["answer"]) == str:
                        name_paths.append(schema_to_name_path[c["schemaId"]]+divider+c["answer"])
                    else:
                        name_paths.append(schema_to_name_path[c["answer"]["schemaId"]])                 
    root = {}
    for input_path in name_paths:
        parts = input_path.split(divider)
        current_node = root
        for part in parts:
            if part not in current_node:
                current_node[part] = {}
            current_node = current_node[part]    
    return build_leaf_paths(root)

def pull_first_name_from_paths(name_paths:list, divider:str="///"):
    firsts = []
    for name_path in name_paths:
        firsts.append(str(name_path.split(divider)[0]))
    return list(set(firsts))

def get_child_paths(first, name_paths, divider:str="///"):
    child_paths = []
    for path in name_paths:
        if path.startswith(first):
            child_path = ""
            for name in path.split(divider)[1:]:
                child_path += str(name)+str(divider)
            child_path = child_path[:-len(divider)] 
            child_paths.append(child_path)
    return child_paths     

def flatten_label(label_dict:dict, ontology_index:dict, schema_to_name_path:dict, mask_method:str="url", divider:str="///"):
    flat_label = {}
    annotations = label_dict["Label"]
    objects = annotations["objects"]
    classifications = annotations["classifications"]
    if objects:
        for obj in objects:
            annotation_type = ontology_index[obj["title"]]["type"]
            annotation_type = "mask" if annotation_type == "raster-segmentation" else annotation_type
            annotation_type = "bbox" if annotation_type == "rectangle" else annotation_type
            column_name = f'{annotation_type}{divider}{obj["title"]}'           
            if column_name not in flat_label.keys():
                flat_label[column_name] = []
            if "bbox" in obj.keys():
                annotation_value = [obj["bbox"]["top"], obj["bbox"]["left"], obj["bbox"]["height"], obj["bbox"]["width"]]
            elif "polygon" in obj.keys():
                annotation_value = [[coord["x"], coord["y"]] for coord in obj["polygon"]]
            elif "line" in obj.keys():
                annotation_value = [[coord["x"], coord["y"]] for coord in obj["line"]]
            elif "point" in obj.keys():
                annotation_value = [obj["point"]["x"], obj["point"]["y"]]
            elif "data" in obj.keys():
                if "location" in obj['data'].keys():
                    annotation_value = [obj["data"]["location"]["start"], obj["data"]["location"]["end"]]
            # else:
            #     if mask_method == "url":
            #         annotation_value = [obj["instanceURI"], [255,255,255]]
            #     elif mask_method == "array": 
            #         array = mask_to_bytes(input=obj["instanceURI"], method="url", color=[255,255,255], output="array")
            #         annotation_value = [array, [255,255,255]]
            #     else:
            #         png = mask_to_bytes(input=obj["instanceURI"], method="url", color=[255,255,255], output="png")
            #         annotation_value = [png, "null"]
            if "classifications" in obj.keys():
                nested_classification_name_paths = get_leaf_paths(
                    export_classifications=obj["classifications"], 
                    schema_to_name_path=schema_to_name_path,
                    divider=divider
                )
                return_paths = get_child_paths(first=obj["title"], name_paths=nested_classification_name_paths, divider=divider)
            else:
                return_paths = []
            flat_label[column_name].append([annotation_value, return_paths])
    if classifications:
        leaf_paths = get_leaf_paths(
            export_classifications=classifications, 
            schema_to_name_path=schema_to_name_path,
            divider=divider
        )
        classification_names = pull_first_name_from_paths(
            name_paths=leaf_paths, 
            divider=divider
        )
        for classification_name in classification_names:
            annotation_type = ontology_index[classification_name]["type"]
            child_paths = get_child_paths(first=classification_name, name_paths=leaf_paths, divider=divider)
            flat_label[f'{annotation_type}{divider}{classification_name}'] = [[name_path for name_path in child_paths]]
    return flat_label

def export_and_flatten_labels(
    client:labelboxClient, project, include_metadata:bool=True, include_performance:bool=True, 
    include_agreement:bool=False, verbose:bool=False, mask_method:str="png", divider="///"):
    if mask_method not in ["url", "png", "array"]:
        raise ValueError(f"Please specify the mask_method you want to download your segmentation masks in - must be either 'url' 'png' or 'array'")
    project = project if type(project) == labelboxProject else client.get_project(project)
    if verbose:
        print(f"Exporting labels from Labelbox for project with ID {project.uid}")
    export = project.export_labels(download=True)
    if verbose:
        print(f"Export complete: {len(export)} labels exported")   
    if include_metadata:
        data_row_ids = list(set([label['DataRow ID'] for label in export]))
        if verbose:
            print(f"Exporting metadata from Labelbox for {len(data_row_ids)} data row IDs")
        mdo = client.get_data_row_metadata_ontology()
        metadata_export = mdo.bulk_export(data_row_ids=data_row_ids)
        metadata_export_index = {x.data_row_id : x for x in metadata_export}
        metadata_schema_to_type = get_metadata_schema_to_type(client=client, lb_mdo=mdo, invert=False)
        metadata_schema_to_name_key = get_metadata_schema_to_name_key(client=client, lb_mdo=mdo, invert=False, divider=divider)
        if verbose:
            print(f"Metadata export complete")
    ontology_index = get_ontology_schema_to_name_path(project.ontology(), invert=True, divider=divider, detailed=True)
    schema_to_name_path = get_ontology_schema_to_name_path(project.ontology(), invert=False, divider=divider, detailed=False)   
    flattened_labels = [] 
    if verbose:
        print(f"Flattening labels...")
    for label in export:
        if not label['Skipped']:
            flat_label = {
                "global_key" : label["Global Key"],
                "row_data" : label["Labeled Data"],
                "data_row_id" : label["DataRow ID"],
                "label_id" : label["ID"],
                "external_id" : label["External ID"]
            }
            res = flatten_label(label_dict=label, ontology_index=ontology_index, schema_to_name_path=schema_to_name_path, mask_method=mask_method, divider=divider)            
            for key, val in res.items():
                flat_label[f"annotation{divider}{str(key)}"] = val
            if include_agreement:
                flat_label["consensus_score"] = label["Agreement"]
            if include_performance:
                flat_label["created_by"] = label["Created By"]
                flat_label["seconds_to_create"] = label["Seconds to Create"]
                flat_label["seconds_to_review"] = label["Seconds to Review"]
                flat_label["seconds_to_label"] = label["Seconds to Label"]
            if include_metadata:
                data_row_metadata = metadata_export_index[label["DataRow ID"]].fields
                for metadata in data_row_metadata:
                    metadata_type = metadata_schema_to_type[metadata.schema_id]
                    if metadata.value in metadata_schema_to_name_key.keys():
                        name_path = metadata_schema_to_name_key[metadata.value].split(divider)
                        field_name = name_path[0]
                        metadata_value = name_path[1]
                    else:
                        field_name = metadata.name
                        metadata_value = metadata.value
                    if field_name != "lb_integration_source":
                        flat_label[f'metadata{divider}{metadata_type}{divider}{field_name}'] = metadata_value
            flattened_labels.append(flat_label)
    if verbose:
        print(f"Labels flattened")            
    return flattened_labels 

def export_to_table(
        project, client,
        include_metadata:bool=False, include_performance:bool=False, include_agreement:bool=False,
        verbose:bool=False, mask_method:str="png", divider="///"):
        flattened_labels_dict = export_and_flatten_labels(
            client=client, project=project, 
            include_metadata=include_metadata, include_performance=include_performance, include_agreement=include_agreement,
            mask_method=mask_method, verbose=verbose, divider=divider
        )
        
        table = pd.DataFrame.from_dict(flattened_labels_dict)
        
        if verbose:
            print(f"Success: DataFrame generated")
        
        return table

API_KEY = "<API_KEY>"
client = lb.Client(API_KEY)
project_id = '<Project ID>'
df = export_to_table(project_id, client)

Great, thank you. I will try this code out.

The errors from before are gone now, thank you.

Still not a clean conversion to the formatting that AWS asks for but I can work on the transformations from here as it seems barely anyone has made code to transform data between the 2 services.