Here’s the updated code to export the project to a table, you just need to add in your API key and project ID at the bottom and it should create the dataframe that you can export to a csv. Let me know if you run into any issues with this.
import labelbox as lb
from labelbox import Client as labelboxClient
from labelbox import Project as labelboxProject
from labelbox import Ontology as labelboxOntology
import pandas as pd
def get_ontology_schema_to_name_path(ontology, divider:str="///", invert:bool=False, detailed:bool=False):
def map_layer(feature_dict:dict={}, node_layer:list= [], parent_name_path:str="", divider:str="///", invert:bool=False, detailed:bool=False, encoded_value:int=0):
if node_layer:
for node in node_layer:
encoded_value += 1
if "tool" in node.keys():
node_name = node["name"]
next_layer = node["classifications"]
node_type = node["tool"]
node_type = "bbox" if node_type == "rectangle" else node_type
node_type = "mask" if node_type in ["superpixel", "raster-segmentation"] else node_type
node_kind = "tool"
elif "instructions" in node.keys():
node_name = node["instructions"]
next_layer = node["options"]
node_kind = "classification"
node_type = node["type"]
else:
node_type = "option"
node_name = node["label"]
next_layer = node.get("options", [])
node_kind = "branch_option" if next_layer else "leaf_option"
name_path = f"{parent_name_path}{divider}{node_name}" if parent_name_path else node_name
dict_key = node['featureSchemaId'] if not invert else name_path
if detailed:
if not invert:
dict_value = {"name":node_name,"type":node_type,"kind":node_kind,"encoded_value":encoded_value,"name_path":name_path}
else:
dict_value = {"name":node_name,"type":node_type,"kind":node_kind,"encoded_value":encoded_value,"schema_id":node['featureSchemaId']}
else:
dict_value = name_path if not invert else node['featureSchemaId']
feature_dict.update({dict_key : dict_value})
if next_layer:
feature_dict, encoded_value = map_layer(feature_dict, next_layer, name_path, divider, invert=invert, detailed=detailed, encoded_value=encoded_value)
return feature_dict, encoded_value
if type(ontology) == labelboxOntology:
ontology_normalized = ontology.normalized
elif type(ontology) == dict:
ontology_normalized = ontology
else:
raise TypeError(f"Input for ontology must be either a Lablbox ontology object or a dictionary representation of a Labelbox ontology - received input of type {ontology}")
if ontology_normalized["tools"]:
working_dictionary, working_encoded_value = map_layer(feature_dict={}, node_layer=ontology_normalized["tools"], divider=divider, invert=invert, detailed=detailed)
else:
working_dictionary = {}
working_encoded_value = 0
if ontology_normalized["classifications"]:
working_dictionary, working_encoded_value = map_layer(feature_dict=working_dictionary, node_layer=ontology_normalized["classifications"], divider=divider, invert=invert, detailed=detailed, encoded_value=working_encoded_value)
return working_dictionary
def get_metadata_schema_to_type(client:labelboxClient, lb_mdo=False, invert:bool=False):
metadata_schema_to_type = {}
lb_mdo = client.get_data_row_metadata_ontology() if not lb_mdo else lb_mdo
for field in lb_mdo._get_ontology():
metadata_type = ""
if "enum" in field["kind"].lower():
metadata_type = "enum"
if "string" in field["kind"].lower():
metadata_type = "string"
if "datetime" in field["kind"].lower():
metadata_type = "datetime"
if "number" in field["kind"].lower():
metadata_type = "number"
if metadata_type:
metadata_schema_to_type[field["id"]] = metadata_type
return_value = metadata_schema_to_type if not invert else {v:k for k,v in metadata_schema_to_type.items()}
return return_value
def get_metadata_schema_to_name_key(client:labelboxClient, lb_mdo=False, divider="///", invert:bool=False):
lb_mdo = client.get_data_row_metadata_ontology() if not lb_mdo else lb_mdo
lb_metadata_dict = lb_mdo.reserved_by_name
lb_metadata_dict.update(lb_mdo.custom_by_name)
metadata_schema_to_name_key = {}
for metadata_field_name_key in lb_metadata_dict:
if type(lb_metadata_dict[metadata_field_name_key]) == dict:
metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name_key][next(iter(lb_metadata_dict[metadata_field_name_key]))].parent] = str(metadata_field_name_key)
for enum_option in lb_metadata_dict[metadata_field_name_key]:
metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name_key][enum_option].uid] = f"{str(metadata_field_name_key)}{str(divider)}{str(enum_option)}"
else:
metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name_key].uid] = str(metadata_field_name_key)
return_value = metadata_schema_to_name_key if not invert else {v:k for k,v in metadata_schema_to_name_key.items()}
return return_value
def get_leaf_paths(export_classifications:list, schema_to_name_path:dict, divider:str="///"):
def build_leaf_paths(root:dict, acc="", name_paths=[], divider="///"):
for parent in root.keys():
name_path = f"{acc}{divider}{parent}" if acc else f"{parent}"
child = root[parent]
if child:
name_paths = build_leaf_paths(root=root[parent], acc=name_path, name_paths=name_paths)
else:
name_paths.append(name_path)
return name_paths
name_paths = []
for cla in export_classifications:
if type(cla) == dict:
if "answers" in cla.keys():
for answer in cla["answers"]:
name_paths.append(schema_to_name_path[answer["schemaId"]])
if "answer" in cla.keys():
if type(cla["answer"]) == str:
name_paths.append(schema_to_name_path[cla["schemaId"]]+divider+cla["answer"])
else:
name_paths.append(schema_to_name_path[cla["answer"]["schemaId"]])
else:
for c in cla:
if "answers" in c.keys():
for answer in c["answers"]:
name_paths.append(schema_to_name_path[answer["schemaId"]])
if "answer" in c.keys():
if type(c["answer"]) == str:
name_paths.append(schema_to_name_path[c["schemaId"]]+divider+c["answer"])
else:
name_paths.append(schema_to_name_path[c["answer"]["schemaId"]])
root = {}
for input_path in name_paths:
parts = input_path.split(divider)
current_node = root
for part in parts:
if part not in current_node:
current_node[part] = {}
current_node = current_node[part]
return build_leaf_paths(root)
def pull_first_name_from_paths(name_paths:list, divider:str="///"):
firsts = []
for name_path in name_paths:
firsts.append(str(name_path.split(divider)[0]))
return list(set(firsts))
def get_child_paths(first, name_paths, divider:str="///"):
child_paths = []
for path in name_paths:
if path.startswith(first):
child_path = ""
for name in path.split(divider)[1:]:
child_path += str(name)+str(divider)
child_path = child_path[:-len(divider)]
child_paths.append(child_path)
return child_paths
def flatten_label(label_dict:dict, ontology_index:dict, schema_to_name_path:dict, mask_method:str="url", divider:str="///"):
flat_label = {}
annotations = label_dict["Label"]
objects = annotations["objects"]
classifications = annotations["classifications"]
if objects:
for obj in objects:
annotation_type = ontology_index[obj["title"]]["type"]
annotation_type = "mask" if annotation_type == "raster-segmentation" else annotation_type
annotation_type = "bbox" if annotation_type == "rectangle" else annotation_type
column_name = f'{annotation_type}{divider}{obj["title"]}'
if column_name not in flat_label.keys():
flat_label[column_name] = []
if "bbox" in obj.keys():
annotation_value = [obj["bbox"]["top"], obj["bbox"]["left"], obj["bbox"]["height"], obj["bbox"]["width"]]
elif "polygon" in obj.keys():
annotation_value = [[coord["x"], coord["y"]] for coord in obj["polygon"]]
elif "line" in obj.keys():
annotation_value = [[coord["x"], coord["y"]] for coord in obj["line"]]
elif "point" in obj.keys():
annotation_value = [obj["point"]["x"], obj["point"]["y"]]
elif "data" in obj.keys():
if "location" in obj['data'].keys():
annotation_value = [obj["data"]["location"]["start"], obj["data"]["location"]["end"]]
# else:
# if mask_method == "url":
# annotation_value = [obj["instanceURI"], [255,255,255]]
# elif mask_method == "array":
# array = mask_to_bytes(input=obj["instanceURI"], method="url", color=[255,255,255], output="array")
# annotation_value = [array, [255,255,255]]
# else:
# png = mask_to_bytes(input=obj["instanceURI"], method="url", color=[255,255,255], output="png")
# annotation_value = [png, "null"]
if "classifications" in obj.keys():
nested_classification_name_paths = get_leaf_paths(
export_classifications=obj["classifications"],
schema_to_name_path=schema_to_name_path,
divider=divider
)
return_paths = get_child_paths(first=obj["title"], name_paths=nested_classification_name_paths, divider=divider)
else:
return_paths = []
flat_label[column_name].append([annotation_value, return_paths])
if classifications:
leaf_paths = get_leaf_paths(
export_classifications=classifications,
schema_to_name_path=schema_to_name_path,
divider=divider
)
classification_names = pull_first_name_from_paths(
name_paths=leaf_paths,
divider=divider
)
for classification_name in classification_names:
annotation_type = ontology_index[classification_name]["type"]
child_paths = get_child_paths(first=classification_name, name_paths=leaf_paths, divider=divider)
flat_label[f'{annotation_type}{divider}{classification_name}'] = [[name_path for name_path in child_paths]]
return flat_label
def export_and_flatten_labels(
client:labelboxClient, project, include_metadata:bool=True, include_performance:bool=True,
include_agreement:bool=False, verbose:bool=False, mask_method:str="png", divider="///"):
if mask_method not in ["url", "png", "array"]:
raise ValueError(f"Please specify the mask_method you want to download your segmentation masks in - must be either 'url' 'png' or 'array'")
project = project if type(project) == labelboxProject else client.get_project(project)
if verbose:
print(f"Exporting labels from Labelbox for project with ID {project.uid}")
export = project.export_labels(download=True)
if verbose:
print(f"Export complete: {len(export)} labels exported")
if include_metadata:
data_row_ids = list(set([label['DataRow ID'] for label in export]))
if verbose:
print(f"Exporting metadata from Labelbox for {len(data_row_ids)} data row IDs")
mdo = client.get_data_row_metadata_ontology()
metadata_export = mdo.bulk_export(data_row_ids=data_row_ids)
metadata_export_index = {x.data_row_id : x for x in metadata_export}
metadata_schema_to_type = get_metadata_schema_to_type(client=client, lb_mdo=mdo, invert=False)
metadata_schema_to_name_key = get_metadata_schema_to_name_key(client=client, lb_mdo=mdo, invert=False, divider=divider)
if verbose:
print(f"Metadata export complete")
ontology_index = get_ontology_schema_to_name_path(project.ontology(), invert=True, divider=divider, detailed=True)
schema_to_name_path = get_ontology_schema_to_name_path(project.ontology(), invert=False, divider=divider, detailed=False)
flattened_labels = []
if verbose:
print(f"Flattening labels...")
for label in export:
if not label['Skipped']:
flat_label = {
"global_key" : label["Global Key"],
"row_data" : label["Labeled Data"],
"data_row_id" : label["DataRow ID"],
"label_id" : label["ID"],
"external_id" : label["External ID"]
}
res = flatten_label(label_dict=label, ontology_index=ontology_index, schema_to_name_path=schema_to_name_path, mask_method=mask_method, divider=divider)
for key, val in res.items():
flat_label[f"annotation{divider}{str(key)}"] = val
if include_agreement:
flat_label["consensus_score"] = label["Agreement"]
if include_performance:
flat_label["created_by"] = label["Created By"]
flat_label["seconds_to_create"] = label["Seconds to Create"]
flat_label["seconds_to_review"] = label["Seconds to Review"]
flat_label["seconds_to_label"] = label["Seconds to Label"]
if include_metadata:
data_row_metadata = metadata_export_index[label["DataRow ID"]].fields
for metadata in data_row_metadata:
metadata_type = metadata_schema_to_type[metadata.schema_id]
if metadata.value in metadata_schema_to_name_key.keys():
name_path = metadata_schema_to_name_key[metadata.value].split(divider)
field_name = name_path[0]
metadata_value = name_path[1]
else:
field_name = metadata.name
metadata_value = metadata.value
if field_name != "lb_integration_source":
flat_label[f'metadata{divider}{metadata_type}{divider}{field_name}'] = metadata_value
flattened_labels.append(flat_label)
if verbose:
print(f"Labels flattened")
return flattened_labels
def export_to_table(
project, client,
include_metadata:bool=False, include_performance:bool=False, include_agreement:bool=False,
verbose:bool=False, mask_method:str="png", divider="///"):
flattened_labels_dict = export_and_flatten_labels(
client=client, project=project,
include_metadata=include_metadata, include_performance=include_performance, include_agreement=include_agreement,
mask_method=mask_method, verbose=verbose, divider=divider
)
table = pd.DataFrame.from_dict(flattened_labels_dict)
if verbose:
print(f"Success: DataFrame generated")
return table
API_KEY = "<API_KEY>"
client = lb.Client(API_KEY)
project_id = '<Project ID>'
df = export_to_table(project_id, client)