Source code for tasks.task
from __future__ import annotations
import json
import re
from abc import abstractmethod
from typing import Any
from typing import List
from pydantic import BaseModel
from datapipes.datapipe import DataPipe
[docs]
class BaseTask(BaseModel):
"""
**Description:**
This class is the base implementation for the Tasks. For every new task that you want to create, you should
inherit from this class and override the attributes and methods based on your task's need. This class defines a base class named BaseTask.
This class serves as a foundation for defining common properties and behaviors among various tasks in the system.
Attributes:
name: The name of the task. It should be unique underscore_case to be defined in TaskType. sample_task_name
chat_name: This is the name that later will be used if needed to mention the tasks inside the chat with the user.
It should be Camel Case. SampleTaskChatName
description: The description of the what specifically the task is doing.
Try to define it as specific as possible to help the Task Planner decide better.
dependencies: You can put the name of the TaskTypes that this task is dependent on. For example, in stress detection scenario,
the stress analysis is dependent on the fetch hrv data task. [TaskType.SERPAPI, TASKTYPE.EXTRACT_TEXT]
inputs: This is the list of descriptions for the inputs that should be provided by the planner.
For example if your task has two inputs: ["the first input description", "the second input description"]
outputs: This is the list of the description of the outputs that the task returns.
This helps the planner to understand the returned results better and use it as needed.
For example, if the task returns a list of sleep hours for different sleep states,
the description helps planner learn which number is related to what state.
output_type: This indicates if the task result should be stored in the DataPipe or be returned directly to the planner.
This process will be done in the parse_input and post_execute methods. If needed you can overwrite them.
return_direct: This indicates if this task should completely interrupt the planning process or not.
This is needed in cases like when you want to ask a question from user and no further
planning is needed until the user gives the proper answer (look at ask_user task)
"""
name: str
chat_name: str
description: str
dependencies: List[str] = []
inputs: List[str] = []
outputs: List[str] = []
datapipe: DataPipe = None
# False if the output should directly passed back to the planner.
# True if it should be stored in datapipe
output_type: bool = False
# False if planner should continue. True if after this task the planning should be
# on pause or stop. examples are when you have a task that asks user to provide more information
return_direct: bool = False
class Config:
"""Configuration for this pydantic object."""
arbitrary_types_allowed = True
@property
def name(self):
return self.name
@property
def dependencies(self):
return self.dependencies
@property
def inputs(self):
return ", ".join(
[
f"{str(i)}-{input}"
for i, input in enumerate(self.inputs)
]
)
[docs]
@abstractmethod
def _execute(
self,
inputs: List[Any],
) -> str:
"""
Abstract method representing the execution of the task. You should implement this method based on your need.
This method is called by the **execute** method that provides the parsed inputs to this method.
Args:
inputs (List[Any]): Input data for the task.
Return:
str: Result of the task execution.
Raise:
NotImplementedError: Subclasses must implement the execute method.
"""
[docs]
def _post_execute(self, result: str = "") -> str:
"""
This method is called inside **execute** method after calling **_execute**. The result of **_execute** will be passed to this method
in case the **output_type** attribute is True, the result will be stored inside the datapipe and the datapipe key is returned to
the plenner instead of the raw result. This is good practice for times that you have intermediate data (like sleep data over a month)
and it needs to be passed over to other tasks and the raw result is not immidiately needed.
This will save a huge amount of tokens and makes sure that the planner will not pass wrong raw data to the tasks.
It is important to note that to make the **DataPipe's** stored data standard and unified, we store the data in the json string
format that currently contains 'data' and 'description' keys. The 'data' will be the returned data after execution and the 'description'
is created using the **outputs** attribute of the task. Whenever the raw data is returned to the planner, these **outputs** descriptions
will help the planner understand and learn how to interpret the 'data' to generate the final answer or continue planning.
Args:
result (str): string containig the task result.
Return:
List[str]: List of parsed strings.
"""
if self.output_type:
key = self.datapipe.store(
json.dumps(
{
"data": result,
"description": "\n".join(self.outputs),
}
)
)
return f"datapipe:{key}"
return result
def _get_input_format(self):
return "\n".join(
f" {i+1}-{word}\n" for i, word in enumerate(self.inputs)
)
[docs]
def execute(self, input_args: List[str]) -> str:
"""
This method is called by the **Orchestrator** which provides the planner provided inputs.
This method first calls **_parse_input** to parse the inputs and retrieve needed data from the **DataPipe**
Then **_execute** is called and the parsed inputs are given to this method. Finally the final result of execution is passed to
**_post_execute** and ith will either be stored inside **DataPipe** or directly returned to the planner to continue planning.
Args:
input_args (str): Input string provided by planner.
Return:
str: The final result of the task execution.
"""
inputs = self._parse_input(input_args)
if not self._validate_inputs(inputs):
inputs_format = self._get_input_format()
raise ValueError(
"Wrong inputs are provided."
f"The inputs should follow the descriptions: {inputs_format}"
)
result = self._execute(inputs)
return self._post_execute(result)
[docs]
def get_dict(self) -> str:
"""
Generate a dictionary-like representation of the task.
Return:
str: String representation of the task dictionary.
"""
dependencies = ",".join(
f"{i+1}-{word}"
for i, word in enumerate(self.dependencies)
)
prompt = f"**{self.name}**: {self.description}"
if len(self.inputs) > 0:
inputs = self._get_input_format()
prompt += f"\n The input to this tool should be a list of data representing:\n {inputs}"
if len(self.dependencies) > 0:
prompt += f"\n This tool is dependent on the following tools. make sure these tools are called first: {dependencies}"
if len(self.outputs) > 0:
prompt += (
"\n This tool will return the following data:\n- "
+ "\n- ".join(self.outputs)
)
if self.output_type:
prompt += "\n The result will be stored in datapipe."
return prompt
[docs]
def explain(
self,
) -> str:
"""
Provide a sample explanation for the task.
Return:
str: Sample explanation for the task.
"""
return """
Sample Explanation
"""