Skip to content

API Reference

Source code in instructor/client.py
def from_openai(
    client: openai.OpenAI | openai.AsyncOpenAI,
    mode: instructor.Mode = instructor.Mode.TOOLS,
    **kwargs: Any,
) -> Instructor | AsyncInstructor:
    if hasattr(client, "base_url"):
        provider = get_provider(str(client.base_url))
    else:
        provider = Provider.OPENAI

    if not isinstance(client, (openai.OpenAI, openai.AsyncOpenAI)):
        import warnings

        warnings.warn(
            "Client should be an instance of openai.OpenAI or openai.AsyncOpenAI. Unexpected behavior may occur with other client types.",
            stacklevel=2,
        )

    if provider in {Provider.ANYSCALE, Provider.TOGETHER}:
        assert mode in {
            instructor.Mode.TOOLS,
            instructor.Mode.JSON,
            instructor.Mode.JSON_SCHEMA,
            instructor.Mode.MD_JSON,
        }

    if provider in {Provider.OPENAI, Provider.DATABRICKS}:
        assert mode in {
            instructor.Mode.TOOLS,
            instructor.Mode.JSON,
            instructor.Mode.FUNCTIONS,
            instructor.Mode.PARALLEL_TOOLS,
            instructor.Mode.MD_JSON,
            instructor.Mode.TOOLS_STRICT,
            instructor.Mode.JSON_O1,
        }

    if isinstance(client, openai.OpenAI):
        return Instructor(
            client=client,
            create=instructor.patch(create=client.chat.completions.create, mode=mode),
            mode=mode,
            provider=provider,
            **kwargs,
        )

    if isinstance(client, openai.AsyncOpenAI):
        return AsyncInstructor(
            client=client,
            create=instructor.patch(create=client.chat.completions.create, mode=mode),
            mode=mode,
            provider=provider,
            **kwargs,
        )

Validator

Bases: OpenAISchema

Validate if an attribute is correct and if not, return a new value with an error message

Source code in instructor/dsl/validators.py
class Validator(OpenAISchema):
    """
    Validate if an attribute is correct and if not,
    return a new value with an error message
    """

    is_valid: bool = Field(
        default=True,
        description="Whether the attribute is valid based on the requirements",
    )
    reason: Optional[str] = Field(
        default=None,
        description="The error message if the attribute is not valid, otherwise None",
    )
    fixed_value: Optional[str] = Field(
        default=None,
        description="If the attribute is not valid, suggest a new value for the attribute",
    )

llm_validator(statement, client, allow_override=False, model='gpt-3.5-turbo', temperature=0)

Create a validator that uses the LLM to validate an attribute

Usage

from instructor import llm_validator
from pydantic import BaseModel, Field, field_validator

class User(BaseModel):
    name: str = Annotated[str, llm_validator("The name must be a full name all lowercase")
    age: int = Field(description="The age of the person")

try:
    user = User(name="Jason Liu", age=20)
except ValidationError as e:
    print(e)
1 validation error for User
name
    The name is valid but not all lowercase (type=value_error.llm_validator)

Note that there, the error message is written by the LLM, and the error type is value_error.llm_validator.

Parameters:

Name Type Description Default
statement str

The statement to validate

required
model str

The LLM to use for validation (default: "gpt-3.5-turbo-0613")

'gpt-3.5-turbo'
temperature float

The temperature to use for the LLM (default: 0)

0
openai_client OpenAI

The OpenAI client to use (default: None)

required
Source code in instructor/dsl/validators.py
def llm_validator(
    statement: str,
    client: Instructor,
    allow_override: bool = False,
    model: str = "gpt-3.5-turbo",
    temperature: float = 0,
) -> Callable[[str], str]:
    """
    Create a validator that uses the LLM to validate an attribute

    ## Usage

    ```python
    from instructor import llm_validator
    from pydantic import BaseModel, Field, field_validator

    class User(BaseModel):
        name: str = Annotated[str, llm_validator("The name must be a full name all lowercase")
        age: int = Field(description="The age of the person")

    try:
        user = User(name="Jason Liu", age=20)
    except ValidationError as e:
        print(e)
    ```

    ```
    1 validation error for User
    name
        The name is valid but not all lowercase (type=value_error.llm_validator)
    ```

    Note that there, the error message is written by the LLM, and the error type is `value_error.llm_validator`.

    Parameters:
        statement (str): The statement to validate
        model (str): The LLM to use for validation (default: "gpt-3.5-turbo-0613")
        temperature (float): The temperature to use for the LLM (default: 0)
        openai_client (OpenAI): The OpenAI client to use (default: None)
    """

    def llm(v: str) -> str:
        resp = client.chat.completions.create(
            response_model=Validator,
            messages=[
                {
                    "role": "system",
                    "content": "You are a world class validation model. Capable to determine if the following value is valid for the statement, if it is not, explain why and suggest a new value.",
                },
                {
                    "role": "user",
                    "content": f"Does `{v}` follow the rules: {statement}",
                },
            ],
            model=model,
            temperature=temperature,
        )

        # If the response is  not valid, return the reason, this could be used in
        # the future to generate a better response, via reasking mechanism.
        assert resp.is_valid, resp.reason

        if allow_override and not resp.is_valid and resp.fixed_value is not None:
            # If the value is not valid, but we allow override, return the fixed value
            return resp.fixed_value
        return v

    return llm

openai_moderation(client)

Validates a message using OpenAI moderation model.

Should only be used for monitoring inputs and outputs of OpenAI APIs Other use cases are disallowed as per: https://platform.openai.com/docs/guides/moderation/overview

Example:

from instructor import OpenAIModeration

class Response(BaseModel):
    message: Annotated[str, AfterValidator(OpenAIModeration(openai_client=client))]

Response(message="I hate you")

 ValidationError: 1 validation error for Response
 message
Value error, `I hate you.` was flagged for ['harassment'] [type=value_error, input_value='I hate you.', input_type=str]

client (OpenAI): The OpenAI client to use, must be sync (default: None)

Source code in instructor/dsl/validators.py
def openai_moderation(client: OpenAI) -> Callable[[str], str]:
    """
    Validates a message using OpenAI moderation model.

    Should only be used for monitoring inputs and outputs of OpenAI APIs
    Other use cases are disallowed as per:
    https://platform.openai.com/docs/guides/moderation/overview

    Example:
    ```python
    from instructor import OpenAIModeration

    class Response(BaseModel):
        message: Annotated[str, AfterValidator(OpenAIModeration(openai_client=client))]

    Response(message="I hate you")
    ```

    ```
     ValidationError: 1 validation error for Response
     message
    Value error, `I hate you.` was flagged for ['harassment'] [type=value_error, input_value='I hate you.', input_type=str]
    ```

    client (OpenAI): The OpenAI client to use, must be sync (default: None)
    """

    def validate_message_with_openai_mod(v: str) -> str:
        response = client.moderations.create(input=v)
        out = response.results[0]
        cats = out.categories.model_dump()
        if out.flagged:
            raise ValueError(
                f"`{v}` was flagged for {', '.join(cat for cat in cats if cats[cat])}"
            )

        return v

    return validate_message_with_openai_mod

IterableModel(subtask_class, name=None, description=None)

Dynamically create a IterableModel OpenAISchema that can be used to segment multiple tasks given a base class. This creates class that can be used to create a toolkit for a specific task, names and descriptions are automatically generated. However they can be overridden.

Usage

from pydantic import BaseModel, Field
from instructor import IterableModel

class User(BaseModel):
    name: str = Field(description="The name of the person")
    age: int = Field(description="The age of the person")
    role: str = Field(description="The role of the person")

MultiUser = IterableModel(User)

Result

class MultiUser(OpenAISchema, MultiTaskBase):
    tasks: List[User] = Field(
        default_factory=list,
        repr=False,
        description="Correctly segmented list of `User` tasks",
    )

    @classmethod
    def from_streaming_response(cls, completion) -> Generator[User]:
        '''
        Parse the streaming response from OpenAI and yield a `User` object
        for each task in the response
        '''
        json_chunks = cls.extract_json(completion)
        yield from cls.tasks_from_chunks(json_chunks)

Parameters:

Name Type Description Default
subtask_class Type[OpenAISchema]

The base class to use for the MultiTask

required
name Optional[str]

The name of the MultiTask class, if None then the name of the subtask class is used as Multi{subtask_class.__name__}

None
description Optional[str]

The description of the MultiTask class, if None then the description is set to Correct segmentation of{subtask_class.name}tasks

None

Returns:

Name Type Description
schema OpenAISchema

A new class that can be used to segment multiple tasks

Source code in instructor/dsl/iterable.py
def IterableModel(
    subtask_class: type[BaseModel],
    name: Optional[str] = None,
    description: Optional[str] = None,
) -> type[BaseModel]:
    """
    Dynamically create a IterableModel OpenAISchema that can be used to segment multiple
    tasks given a base class. This creates class that can be used to create a toolkit
    for a specific task, names and descriptions are automatically generated. However
    they can be overridden.

    ## Usage

    ```python
    from pydantic import BaseModel, Field
    from instructor import IterableModel

    class User(BaseModel):
        name: str = Field(description="The name of the person")
        age: int = Field(description="The age of the person")
        role: str = Field(description="The role of the person")

    MultiUser = IterableModel(User)
    ```

    ## Result

    ```python
    class MultiUser(OpenAISchema, MultiTaskBase):
        tasks: List[User] = Field(
            default_factory=list,
            repr=False,
            description="Correctly segmented list of `User` tasks",
        )

        @classmethod
        def from_streaming_response(cls, completion) -> Generator[User]:
            '''
            Parse the streaming response from OpenAI and yield a `User` object
            for each task in the response
            '''
            json_chunks = cls.extract_json(completion)
            yield from cls.tasks_from_chunks(json_chunks)
    ```

    Parameters:
        subtask_class (Type[OpenAISchema]): The base class to use for the MultiTask
        name (Optional[str]): The name of the MultiTask class, if None then the name
            of the subtask class is used as `Multi{subtask_class.__name__}`
        description (Optional[str]): The description of the MultiTask class, if None
            then the description is set to `Correct segmentation of `{subtask_class.__name__}` tasks`

    Returns:
        schema (OpenAISchema): A new class that can be used to segment multiple tasks
    """
    task_name = subtask_class.__name__ if name is None else name

    name = f"Iterable{task_name}"

    list_tasks = (
        list[subtask_class],
        Field(
            default_factory=list,
            repr=False,
            description=f"Correctly segmented list of `{task_name}` tasks",
        ),
    )

    base_models = cast(tuple[type[BaseModel], ...], (OpenAISchema, IterableBase))
    new_cls = create_model(
        name,
        tasks=list_tasks,
        __base__=base_models,
    )
    new_cls = cast(type[IterableBase], new_cls)

    # set the class constructor BaseModel
    new_cls.task_type = subtask_class

    new_cls.__doc__ = (
        f"Correct segmentation of `{task_name}` tasks"
        if description is None
        else description
    )
    assert issubclass(
        new_cls, OpenAISchema
    ), "The new class should be a subclass of OpenAISchema"
    return new_cls

Partial

Bases: Generic[T_Model]

Generate a new class which has PartialBase as a base class.

Notes

This will enable partial validation of the model while streaming.

Example

Partial[SomeModel]

Source code in instructor/dsl/partial.py
class Partial(Generic[T_Model]):
    """Generate a new class which has PartialBase as a base class.

    Notes:
        This will enable partial validation of the model while streaming.

    Example:
        Partial[SomeModel]
    """

    def __new__(
        cls,
        *args: object,  # noqa :ARG003
        **kwargs: object,  # noqa :ARG003
    ) -> Partial[T_Model]:
        """Cannot instantiate.

        Raises:
            TypeError: Direct instantiation not allowed.
        """
        raise TypeError("Cannot instantiate abstract Partial class.")

    def __init_subclass__(
        cls,
        *args: object,
        **kwargs: object,
    ) -> NoReturn:
        """Cannot subclass.

        Raises:
           TypeError: Subclassing not allowed.
        """
        raise TypeError(f"Cannot subclass {cls.__module__}.Partial")

    def __class_getitem__(
        cls,
        wrapped_class: type[T_Model] | tuple[type[T_Model], type[MakeFieldsOptional]],
    ) -> type[T_Model]:
        """Convert model to one that inherits from PartialBase.

        We don't make the fields optional at this point, we just wrap them with `Partial` so the names of the nested models will be
        `Partial{ModelName}`. We want the output of `model_json_schema()` to
        reflect the name change, but everything else should be the same as the
        original model. During validation, we'll generate a true partial model
        to support partially defined fields.

        """

        make_fields_optional = None
        if isinstance(wrapped_class, tuple):
            wrapped_class, make_fields_optional = wrapped_class

        def _wrap_models(field: FieldInfo) -> tuple[object, FieldInfo]:
            tmp_field = deepcopy(field)

            annotation = field.annotation

            # Handle generics (like List, Dict, etc.)
            if get_origin(annotation) is not None:
                # Get the generic base (like List, Dict) and its arguments (like User in List[User])
                generic_base = get_origin(annotation)
                generic_args = get_args(annotation)

                # Recursively apply Partial to each of the generic arguments
                modified_args = tuple(
                    (
                        Partial[arg]
                        if isinstance(arg, type) and issubclass(arg, BaseModel)
                        else arg
                    )
                    for arg in generic_args
                )

                # Reconstruct the generic type with modified arguments
                tmp_field.annotation = (
                    generic_base[modified_args] if generic_base else None
                )
            # If the field is a BaseModel, then recursively convert it's
            # attributes to optionals.
            elif isinstance(annotation, type) and issubclass(annotation, BaseModel):
                tmp_field.annotation = Partial[annotation]
            return tmp_field.annotation, tmp_field

        model_name = (
            wrapped_class.__name__
            if wrapped_class.__name__.startswith("Partial")
            else f"Partial{wrapped_class.__name__}"
        )

        return create_model(
            model_name,
            __base__=(wrapped_class, PartialBase),  # type: ignore
            __module__=wrapped_class.__module__,
            **{
                field_name: (
                    _make_field_optional(field_info)
                    if make_fields_optional is not None
                    else _wrap_models(field_info)
                )
                for field_name, field_info in wrapped_class.model_fields.items()
            },  # type: ignore
        )

__class_getitem__(wrapped_class)

Convert model to one that inherits from PartialBase.

We don't make the fields optional at this point, we just wrap them with Partial so the names of the nested models will be Partial{ModelName}. We want the output of model_json_schema() to reflect the name change, but everything else should be the same as the original model. During validation, we'll generate a true partial model to support partially defined fields.

Source code in instructor/dsl/partial.py
def __class_getitem__(
    cls,
    wrapped_class: type[T_Model] | tuple[type[T_Model], type[MakeFieldsOptional]],
) -> type[T_Model]:
    """Convert model to one that inherits from PartialBase.

    We don't make the fields optional at this point, we just wrap them with `Partial` so the names of the nested models will be
    `Partial{ModelName}`. We want the output of `model_json_schema()` to
    reflect the name change, but everything else should be the same as the
    original model. During validation, we'll generate a true partial model
    to support partially defined fields.

    """

    make_fields_optional = None
    if isinstance(wrapped_class, tuple):
        wrapped_class, make_fields_optional = wrapped_class

    def _wrap_models(field: FieldInfo) -> tuple[object, FieldInfo]:
        tmp_field = deepcopy(field)

        annotation = field.annotation

        # Handle generics (like List, Dict, etc.)
        if get_origin(annotation) is not None:
            # Get the generic base (like List, Dict) and its arguments (like User in List[User])
            generic_base = get_origin(annotation)
            generic_args = get_args(annotation)

            # Recursively apply Partial to each of the generic arguments
            modified_args = tuple(
                (
                    Partial[arg]
                    if isinstance(arg, type) and issubclass(arg, BaseModel)
                    else arg
                )
                for arg in generic_args
            )

            # Reconstruct the generic type with modified arguments
            tmp_field.annotation = (
                generic_base[modified_args] if generic_base else None
            )
        # If the field is a BaseModel, then recursively convert it's
        # attributes to optionals.
        elif isinstance(annotation, type) and issubclass(annotation, BaseModel):
            tmp_field.annotation = Partial[annotation]
        return tmp_field.annotation, tmp_field

    model_name = (
        wrapped_class.__name__
        if wrapped_class.__name__.startswith("Partial")
        else f"Partial{wrapped_class.__name__}"
    )

    return create_model(
        model_name,
        __base__=(wrapped_class, PartialBase),  # type: ignore
        __module__=wrapped_class.__module__,
        **{
            field_name: (
                _make_field_optional(field_info)
                if make_fields_optional is not None
                else _wrap_models(field_info)
            )
            for field_name, field_info in wrapped_class.model_fields.items()
        },  # type: ignore
    )

__init_subclass__(*args, **kwargs)

Cannot subclass.

Raises:

Type Description
TypeError

Subclassing not allowed.

Source code in instructor/dsl/partial.py
def __init_subclass__(
    cls,
    *args: object,
    **kwargs: object,
) -> NoReturn:
    """Cannot subclass.

    Raises:
       TypeError: Subclassing not allowed.
    """
    raise TypeError(f"Cannot subclass {cls.__module__}.Partial")

__new__(*args, **kwargs)

Cannot instantiate.

Raises:

Type Description
TypeError

Direct instantiation not allowed.

Source code in instructor/dsl/partial.py
def __new__(
    cls,
    *args: object,  # noqa :ARG003
    **kwargs: object,  # noqa :ARG003
) -> Partial[T_Model]:
    """Cannot instantiate.

    Raises:
        TypeError: Direct instantiation not allowed.
    """
    raise TypeError("Cannot instantiate abstract Partial class.")

PartialBase

Bases: Generic[T_Model]

Source code in instructor/dsl/partial.py
class PartialBase(Generic[T_Model]):
    @classmethod
    @cache
    def get_partial_model(cls) -> type[T_Model]:
        """Return a partial model we can use to validate partial results."""
        assert issubclass(
            cls, BaseModel
        ), f"{cls.__name__} must be a subclass of BaseModel"

        model_name = (
            cls.__name__
            if cls.__name__.startswith("Partial")
            else f"Partial{cls.__name__}"
        )

        return create_model(
            model_name,
            __base__=cls,
            __module__=cls.__module__,
            **{
                field_name: _make_field_optional(field_info)
                for field_name, field_info in cls.model_fields.items()
            },  # type: ignore[all]
        )

    @classmethod
    def from_streaming_response(
        cls, completion: Iterable[Any], mode: Mode, **kwargs: Any
    ) -> Generator[T_Model, None, None]:
        json_chunks = cls.extract_json(completion, mode)

        if mode in {Mode.MD_JSON, Mode.GEMINI_TOOLS}:
            json_chunks = extract_json_from_stream(json_chunks)

        if mode == Mode.WRITER_TOOLS:
            yield from cls.writer_model_from_chunks(json_chunks, **kwargs)
        else:
            yield from cls.model_from_chunks(json_chunks, **kwargs)

    @classmethod
    async def from_streaming_response_async(
        cls, completion: AsyncGenerator[Any, None], mode: Mode, **kwargs: Any
    ) -> AsyncGenerator[T_Model, None]:
        json_chunks = cls.extract_json_async(completion, mode)

        if mode == Mode.MD_JSON:
            json_chunks = extract_json_from_stream_async(json_chunks)
        elif mode == Mode.WRITER_TOOLS:
            return cls.writer_model_from_chunks_async(json_chunks, **kwargs)

        return cls.model_from_chunks_async(json_chunks, **kwargs)

    @classmethod
    def writer_model_from_chunks(
        cls, json_chunks: Iterable[Any], **kwargs: Any
    ) -> Generator[T_Model, None, None]:
        potential_object = ""
        partial_model = cls.get_partial_model()
        partial_mode = (
            "on" if issubclass(cls, PartialLiteralMixin) else "trailing-strings"
        )
        for chunk in json_chunks:
            if len(chunk) > len(potential_object):
                potential_object = chunk
            else:
                potential_object += chunk
            obj = from_json(
                (potential_object.strip() or "{}").encode(), partial_mode=partial_mode
            )
            obj = partial_model.model_validate(obj, strict=None, **kwargs)
            yield obj

    @classmethod
    async def writer_model_from_chunks_async(
        cls, json_chunks: AsyncGenerator[str, None], **kwargs: Any
    ) -> AsyncGenerator[T_Model, None]:
        potential_object = ""
        partial_model = cls.get_partial_model()
        partial_mode = (
            "on" if issubclass(cls, PartialLiteralMixin) else "trailing-strings"
        )
        async for chunk in json_chunks:
            if len(chunk) > len(potential_object):
                potential_object = chunk
            else:
                potential_object += chunk
            obj = from_json(
                (potential_object.strip() or "{}").encode(), partial_mode=partial_mode
            )
            obj = partial_model.model_validate(obj, strict=None, **kwargs)
            yield obj

    @classmethod
    def model_from_chunks(
        cls, json_chunks: Iterable[Any], **kwargs: Any
    ) -> Generator[T_Model, None, None]:
        potential_object = ""
        partial_model = cls.get_partial_model()
        partial_mode = (
            "on" if issubclass(cls, PartialLiteralMixin) else "trailing-strings"
        )
        for chunk in json_chunks:
            potential_object += chunk
            obj = from_json(
                (potential_object.strip() or "{}").encode(), partial_mode=partial_mode
            )
            obj = partial_model.model_validate(obj, strict=None, **kwargs)
            yield obj

    @classmethod
    async def model_from_chunks_async(
        cls, json_chunks: AsyncGenerator[str, None], **kwargs: Any
    ) -> AsyncGenerator[T_Model, None]:
        potential_object = ""
        partial_model = cls.get_partial_model()
        partial_mode = (
            "on" if issubclass(cls, PartialLiteralMixin) else "trailing-strings"
        )
        async for chunk in json_chunks:
            potential_object += chunk
            obj = from_json(
                (potential_object.strip() or "{}").encode(), partial_mode=partial_mode
            )
            obj = partial_model.model_validate(obj, strict=None, **kwargs)
            yield obj

    @staticmethod
    def extract_json(
        completion: Iterable[Any], mode: Mode
    ) -> Generator[str, None, None]:
        for chunk in completion:
            try:
                if mode == Mode.ANTHROPIC_JSON:
                    if json_chunk := chunk.delta.text:
                        yield json_chunk
                if mode == Mode.ANTHROPIC_TOOLS:
                    yield chunk.delta.partial_json
                if mode == Mode.GEMINI_JSON:
                    yield chunk.text
                if mode == Mode.GEMINI_TOOLS:
                    # Gemini seems to return the entire function_call and not a chunk?
                    import json

                    resp = chunk.candidates[0].content.parts[0].function_call
                    resp_dict = type(resp).to_dict(resp)  # type:ignore
                    if "args" in resp_dict:
                        yield json.dumps(resp_dict["args"])
                elif chunk.choices:
                    if mode == Mode.FUNCTIONS:
                        Mode.warn_mode_functions_deprecation()
                        if json_chunk := chunk.choices[0].delta.function_call.arguments:
                            yield json_chunk
                    elif mode in {
                        Mode.JSON,
                        Mode.MD_JSON,
                        Mode.JSON_SCHEMA,
                        Mode.CEREBRAS_JSON,
                        Mode.FIREWORKS_JSON,
                    }:
                        if json_chunk := chunk.choices[0].delta.content:
                            yield json_chunk
                    elif mode in {
                        Mode.TOOLS,
                        Mode.TOOLS_STRICT,
                        Mode.FIREWORKS_TOOLS,
                        Mode.WRITER_TOOLS,
                    }:
                        if json_chunk := chunk.choices[0].delta.tool_calls:
                            if json_chunk[0].function.arguments:
                                yield json_chunk[0].function.arguments
                    else:
                        raise NotImplementedError(
                            f"Mode {mode} is not supported for MultiTask streaming"
                        )
            except AttributeError:
                pass

    @staticmethod
    async def extract_json_async(
        completion: AsyncGenerator[Any, None], mode: Mode
    ) -> AsyncGenerator[str, None]:
        async for chunk in completion:
            try:
                if mode == Mode.ANTHROPIC_JSON:
                    if json_chunk := chunk.delta.text:
                        yield json_chunk
                if mode == Mode.ANTHROPIC_TOOLS:
                    yield chunk.delta.partial_json
                elif chunk.choices:
                    if mode == Mode.FUNCTIONS:
                        Mode.warn_mode_functions_deprecation()
                        if json_chunk := chunk.choices[0].delta.function_call.arguments:
                            yield json_chunk
                    elif mode in {
                        Mode.JSON,
                        Mode.MD_JSON,
                        Mode.JSON_SCHEMA,
                        Mode.CEREBRAS_JSON,
                        Mode.FIREWORKS_JSON,
                    }:
                        if json_chunk := chunk.choices[0].delta.content:
                            yield json_chunk
                    elif mode in {
                        Mode.TOOLS,
                        Mode.TOOLS_STRICT,
                        Mode.FIREWORKS_TOOLS,
                        Mode.WRITER_TOOLS,
                    }:
                        if json_chunk := chunk.choices[0].delta.tool_calls:
                            if json_chunk[0].function.arguments:
                                yield json_chunk[0].function.arguments
                    else:
                        raise NotImplementedError(
                            f"Mode {mode} is not supported for MultiTask streaming"
                        )
            except AttributeError:
                pass

get_partial_model() cached classmethod

Return a partial model we can use to validate partial results.

Source code in instructor/dsl/partial.py
@classmethod
@cache
def get_partial_model(cls) -> type[T_Model]:
    """Return a partial model we can use to validate partial results."""
    assert issubclass(
        cls, BaseModel
    ), f"{cls.__name__} must be a subclass of BaseModel"

    model_name = (
        cls.__name__
        if cls.__name__.startswith("Partial")
        else f"Partial{cls.__name__}"
    )

    return create_model(
        model_name,
        __base__=cls,
        __module__=cls.__module__,
        **{
            field_name: _make_field_optional(field_info)
            for field_name, field_info in cls.model_fields.items()
        },  # type: ignore[all]
    )

MaybeBase

Bases: BaseModel, Generic[T]

Extract a result from a model, if any, otherwise set the error and message fields.

Source code in instructor/dsl/maybe.py
class MaybeBase(BaseModel, Generic[T]):
    """
    Extract a result from a model, if any, otherwise set the error and message fields.
    """

    result: Optional[T]
    error: bool = Field(default=False)
    message: Optional[str]

    def __bool__(self) -> bool:
        return self.result is not None

Maybe(model)

Create a Maybe model for a given Pydantic model. This allows you to return a model that includes fields for result, error, and message for sitatations where the data may not be present in the context.

Usage

from pydantic import BaseModel, Field
from instructor import Maybe

class User(BaseModel):
    name: str = Field(description="The name of the person")
    age: int = Field(description="The age of the person")
    role: str = Field(description="The role of the person")

MaybeUser = Maybe(User)

Result

class MaybeUser(BaseModel):
    result: Optional[User]
    error: bool = Field(default=False)
    message: Optional[str]

    def __bool__(self):
        return self.result is not None

Parameters:

Name Type Description Default
model Type[BaseModel]

The Pydantic model to wrap with Maybe.

required

Returns:

Name Type Description
MaybeModel Type[BaseModel]

A new Pydantic model that includes fields for result, error, and message.

Source code in instructor/dsl/maybe.py
def Maybe(model: type[T]) -> type[MaybeBase[T]]:
    """
    Create a Maybe model for a given Pydantic model. This allows you to return a model that includes fields for `result`, `error`, and `message` for sitatations where the data may not be present in the context.

    ## Usage

    ```python
    from pydantic import BaseModel, Field
    from instructor import Maybe

    class User(BaseModel):
        name: str = Field(description="The name of the person")
        age: int = Field(description="The age of the person")
        role: str = Field(description="The role of the person")

    MaybeUser = Maybe(User)
    ```

    ## Result

    ```python
    class MaybeUser(BaseModel):
        result: Optional[User]
        error: bool = Field(default=False)
        message: Optional[str]

        def __bool__(self):
            return self.result is not None
    ```

    Parameters:
        model (Type[BaseModel]): The Pydantic model to wrap with Maybe.

    Returns:
        MaybeModel (Type[BaseModel]): A new Pydantic model that includes fields for `result`, `error`, and `message`.
    """
    return create_model(
        f"Maybe{model.__name__}",
        __base__=MaybeBase,
        result=(
            Optional[model],
            Field(
                default=None,
                description="Correctly extracted result from the model, if any, otherwise None",
            ),
        ),
        error=(bool, Field(default=False)),
        message=(
            Optional[str],
            Field(
                default=None,
                description="Error message if no result was found, should be short and concise",
            ),
        ),
    )

OpenAISchema

Bases: BaseModel

Source code in instructor/function_calls.py
class OpenAISchema(BaseModel):
    # Ignore classproperty, since Pydantic doesn't understand it like it would a normal property.
    model_config = ConfigDict(ignored_types=(classproperty,))

    @classproperty
    def openai_schema(cls) -> dict[str, Any]:
        """
        Return the schema in the format of OpenAI's schema as jsonschema

        Note:
            Its important to add a docstring to describe how to best use this class, it will be included in the description attribute and be part of the prompt.

        Returns:
            model_json_schema (dict): A dictionary in the format of OpenAI's schema as jsonschema
        """
        schema = cls.model_json_schema()
        docstring = parse(cls.__doc__ or "")
        parameters = {
            k: v for k, v in schema.items() if k not in ("title", "description")
        }
        for param in docstring.params:
            if (name := param.arg_name) in parameters["properties"] and (
                description := param.description
            ):
                if "description" not in parameters["properties"][name]:
                    parameters["properties"][name]["description"] = description

        parameters["required"] = sorted(
            k for k, v in parameters["properties"].items() if "default" not in v
        )

        if "description" not in schema:
            if docstring.short_description:
                schema["description"] = docstring.short_description
            else:
                schema["description"] = (
                    f"Correctly extracted `{cls.__name__}` with all "
                    f"the required parameters with correct types"
                )

        return {
            "name": schema["title"],
            "description": schema["description"],
            "parameters": parameters,
        }

    @classproperty
    def anthropic_schema(cls) -> dict[str, Any]:
        return {
            "name": cls.openai_schema["name"],
            "description": cls.openai_schema["description"],
            "input_schema": cls.model_json_schema(),
        }

    @classproperty
    def gemini_schema(cls) -> Any:
        import google.generativeai.types as genai_types

        function = genai_types.FunctionDeclaration(
            name=cls.openai_schema["name"],
            description=cls.openai_schema["description"],
            parameters=map_to_gemini_function_schema(cls.openai_schema["parameters"]),
        )
        return function

    @classmethod
    def from_response(
        cls,
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
        mode: Mode = Mode.TOOLS,
    ) -> BaseModel:
        """Execute the function from the response of an openai chat completion

        Parameters:
            completion (openai.ChatCompletion): The response from an openai chat completion
            throw_error (bool): Whether to throw an error if the function call is not detected
            context (dict): The context to use for validating the response
            strict (bool): Whether to use strict json parsing
            mode (Mode): The openai completion mode

        Returns:
            cls (OpenAISchema): An instance of the class
        """
        if mode == Mode.ANTHROPIC_TOOLS:
            return cls.parse_anthropic_tools(completion, validation_context, strict)

        if mode == Mode.ANTHROPIC_JSON:
            return cls.parse_anthropic_json(completion, validation_context, strict)

        if mode in {Mode.VERTEXAI_TOOLS, Mode.GEMINI_TOOLS}:
            return cls.parse_vertexai_tools(completion, validation_context)

        if mode == Mode.VERTEXAI_JSON:
            return cls.parse_vertexai_json(completion, validation_context, strict)

        if mode == Mode.COHERE_TOOLS:
            return cls.parse_cohere_tools(completion, validation_context, strict)

        if mode == Mode.GEMINI_JSON:
            return cls.parse_gemini_json(completion, validation_context, strict)

        if mode == Mode.GEMINI_TOOLS:
            return cls.parse_gemini_tools(completion, validation_context, strict)

        if mode == Mode.COHERE_JSON_SCHEMA:
            return cls.parse_cohere_json_schema(completion, validation_context, strict)

        if mode == Mode.WRITER_TOOLS:
            return cls.parse_writer_tools(completion, validation_context, strict)

        if completion.choices[0].finish_reason == "length":
            raise IncompleteOutputException(last_completion=completion)

        if mode == Mode.FUNCTIONS:
            Mode.warn_mode_functions_deprecation()
            return cls.parse_functions(completion, validation_context, strict)

        if mode in {
            Mode.TOOLS,
            Mode.MISTRAL_TOOLS,
            Mode.TOOLS_STRICT,
            Mode.CEREBRAS_TOOLS,
            Mode.FIREWORKS_TOOLS,
        }:
            return cls.parse_tools(completion, validation_context, strict)

        if mode in {
            Mode.JSON,
            Mode.JSON_SCHEMA,
            Mode.MD_JSON,
            Mode.JSON_O1,
            Mode.CEREBRAS_JSON,
            Mode.FIREWORKS_JSON,
        }:
            return cls.parse_json(completion, validation_context, strict)

        raise ValueError(f"Invalid patch mode: {mode}")

    @classmethod
    def parse_cohere_json_schema(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ):
        assert hasattr(
            completion, "text"
        ), "Completion is not of type NonStreamedChatResponse"
        return cls.model_validate_json(
            completion.text, context=validation_context, strict=strict
        )

    @classmethod
    def parse_anthropic_tools(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        from anthropic.types import Message

        if isinstance(completion, Message) and completion.stop_reason == "max_tokens":
            raise IncompleteOutputException(last_completion=completion)

        # Anthropic returns arguments as a dict, dump to json for model validation below
        tool_calls = [
            json.dumps(c.input) for c in completion.content if c.type == "tool_use"
        ]  # TODO update with anthropic specific types

        tool_calls_validator = TypeAdapter(
            Annotated[list[Any], Field(min_length=1, max_length=1)]
        )
        tool_call = tool_calls_validator.validate_python(tool_calls)[0]

        return cls.model_validate_json(
            tool_call, context=validation_context, strict=strict
        )

    @classmethod
    def parse_anthropic_json(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        from anthropic.types import Message

        if hasattr(completion, "choices"):
            completion = completion.choices[0]
            if completion.finish_reason == "length":
                raise IncompleteOutputException(last_completion=completion)
            text = completion.message.content
        else:
            assert isinstance(completion, Message)
            if completion.stop_reason == "max_tokens":
                raise IncompleteOutputException(last_completion=completion)
            text = completion.content[0].text

        extra_text = extract_json_from_codeblock(text)

        if strict:
            return cls.model_validate_json(
                extra_text, context=validation_context, strict=True
            )
        else:
            # Allow control characters.
            parsed = json.loads(extra_text, strict=False)
            # Pydantic non-strict: https://docs.pydantic.dev/latest/concepts/strict_mode/
            return cls.model_validate(parsed, context=validation_context, strict=False)

    @classmethod
    def parse_gemini_json(
        cls: type[BaseModel],
        completion: Any,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        try:
            text = completion.text
        except ValueError:
            logger.debug(
                f"Error response: {completion.result.candidates[0].finish_reason}\n\n{completion.result.candidates[0].safety_ratings}"
            )

        try:
            extra_text = extract_json_from_codeblock(text)  # type: ignore
        except UnboundLocalError:
            raise ValueError("Unable to extract JSON from completion text") from None

        if strict:
            return cls.model_validate_json(
                extra_text, context=validation_context, strict=True
            )
        else:
            # Allow control characters.
            parsed = json.loads(extra_text, strict=False)
            # Pydantic non-strict: https://docs.pydantic.dev/latest/concepts/strict_mode/
            return cls.model_validate(parsed, context=validation_context, strict=False)

    @classmethod
    def parse_vertexai_tools(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
    ) -> BaseModel:
        tool_call = completion.candidates[0].content.parts[0].function_call.args  # type: ignore
        model = {}
        for field in tool_call:  # type: ignore
            model[field] = tool_call[field]
        # We enable strict=False because the conversion from protobuf -> dict often results in types like ints being cast to floats, as a result in order for model.validate to work we need to disable strict mode.
        return cls.model_validate(model, context=validation_context, strict=False)

    @classmethod
    def parse_vertexai_json(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        return cls.model_validate_json(
            completion.text, context=validation_context, strict=strict
        )

    @classmethod
    def parse_cohere_tools(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        text = cast(str, completion.text)  # type: ignore - TODO update with cohere specific types
        extra_text = extract_json_from_codeblock(text)
        return cls.model_validate_json(
            extra_text, context=validation_context, strict=strict
        )

    @classmethod
    def parse_writer_tools(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        message = completion.choices[0].message
        tool_calls = message.tool_calls
        assert (
            len(tool_calls) == 1
        ), "Instructor does not support multiple tool calls, use List[Model] instead"
        assert (
            tool_calls[0].function.name == cls.openai_schema["name"]
        ), "Tool name does not match"
        return cls.model_validate_json(
            tool_calls[0].function.arguments,
            context=validation_context,
            strict=strict,
        )

    @classmethod
    def parse_functions(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        message = completion.choices[0].message
        assert (
            message.function_call.name == cls.openai_schema["name"]  # type: ignore[index]
        ), "Function name does not match"
        return cls.model_validate_json(
            message.function_call.arguments,  # type: ignore[attr-defined]
            context=validation_context,
            strict=strict,
        )

    @classmethod
    def parse_tools(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        message = completion.choices[0].message
        # this field seems to be missing when using instructor with some other tools (e.g. litellm)
        # trying to fix this by adding a check

        if hasattr(message, "refusal"):
            assert (
                message.refusal is None
            ), f"Unable to generate a response due to {message.refusal}"
        assert (
            len(message.tool_calls or []) == 1
        ), f"Instructor does not support multiple tool calls, use List[Model] instead"
        tool_call = message.tool_calls[0]  # type: ignore
        assert (
            tool_call.function.name == cls.openai_schema["name"]  # type: ignore[index]
        ), "Tool name does not match"
        return cls.model_validate_json(
            tool_call.function.arguments,  # type: ignore
            context=validation_context,
            strict=strict,
        )

    @classmethod
    def parse_json(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        message = completion.choices[0].message.content or ""
        message = extract_json_from_codeblock(message)

        return cls.model_validate_json(
            message,
            context=validation_context,
            strict=strict,
        )

from_response(completion, validation_context=None, strict=None, mode=Mode.TOOLS) classmethod

Execute the function from the response of an openai chat completion

Parameters:

Name Type Description Default
completion ChatCompletion

The response from an openai chat completion

required
throw_error bool

Whether to throw an error if the function call is not detected

required
context dict

The context to use for validating the response

required
strict bool

Whether to use strict json parsing

None
mode Mode

The openai completion mode

TOOLS

Returns:

Name Type Description
cls OpenAISchema

An instance of the class

Source code in instructor/function_calls.py
@classmethod
def from_response(
    cls,
    completion: ChatCompletion,
    validation_context: Optional[dict[str, Any]] = None,
    strict: Optional[bool] = None,
    mode: Mode = Mode.TOOLS,
) -> BaseModel:
    """Execute the function from the response of an openai chat completion

    Parameters:
        completion (openai.ChatCompletion): The response from an openai chat completion
        throw_error (bool): Whether to throw an error if the function call is not detected
        context (dict): The context to use for validating the response
        strict (bool): Whether to use strict json parsing
        mode (Mode): The openai completion mode

    Returns:
        cls (OpenAISchema): An instance of the class
    """
    if mode == Mode.ANTHROPIC_TOOLS:
        return cls.parse_anthropic_tools(completion, validation_context, strict)

    if mode == Mode.ANTHROPIC_JSON:
        return cls.parse_anthropic_json(completion, validation_context, strict)

    if mode in {Mode.VERTEXAI_TOOLS, Mode.GEMINI_TOOLS}:
        return cls.parse_vertexai_tools(completion, validation_context)

    if mode == Mode.VERTEXAI_JSON:
        return cls.parse_vertexai_json(completion, validation_context, strict)

    if mode == Mode.COHERE_TOOLS:
        return cls.parse_cohere_tools(completion, validation_context, strict)

    if mode == Mode.GEMINI_JSON:
        return cls.parse_gemini_json(completion, validation_context, strict)

    if mode == Mode.GEMINI_TOOLS:
        return cls.parse_gemini_tools(completion, validation_context, strict)

    if mode == Mode.COHERE_JSON_SCHEMA:
        return cls.parse_cohere_json_schema(completion, validation_context, strict)

    if mode == Mode.WRITER_TOOLS:
        return cls.parse_writer_tools(completion, validation_context, strict)

    if completion.choices[0].finish_reason == "length":
        raise IncompleteOutputException(last_completion=completion)

    if mode == Mode.FUNCTIONS:
        Mode.warn_mode_functions_deprecation()
        return cls.parse_functions(completion, validation_context, strict)

    if mode in {
        Mode.TOOLS,
        Mode.MISTRAL_TOOLS,
        Mode.TOOLS_STRICT,
        Mode.CEREBRAS_TOOLS,
        Mode.FIREWORKS_TOOLS,
    }:
        return cls.parse_tools(completion, validation_context, strict)

    if mode in {
        Mode.JSON,
        Mode.JSON_SCHEMA,
        Mode.MD_JSON,
        Mode.JSON_O1,
        Mode.CEREBRAS_JSON,
        Mode.FIREWORKS_JSON,
    }:
        return cls.parse_json(completion, validation_context, strict)

    raise ValueError(f"Invalid patch mode: {mode}")

openai_schema()

Return the schema in the format of OpenAI's schema as jsonschema

Note

Its important to add a docstring to describe how to best use this class, it will be included in the description attribute and be part of the prompt.

Returns:

Name Type Description
model_json_schema dict

A dictionary in the format of OpenAI's schema as jsonschema

Source code in instructor/function_calls.py
@classproperty
def openai_schema(cls) -> dict[str, Any]:
    """
    Return the schema in the format of OpenAI's schema as jsonschema

    Note:
        Its important to add a docstring to describe how to best use this class, it will be included in the description attribute and be part of the prompt.

    Returns:
        model_json_schema (dict): A dictionary in the format of OpenAI's schema as jsonschema
    """
    schema = cls.model_json_schema()
    docstring = parse(cls.__doc__ or "")
    parameters = {
        k: v for k, v in schema.items() if k not in ("title", "description")
    }
    for param in docstring.params:
        if (name := param.arg_name) in parameters["properties"] and (
            description := param.description
        ):
            if "description" not in parameters["properties"][name]:
                parameters["properties"][name]["description"] = description

    parameters["required"] = sorted(
        k for k, v in parameters["properties"].items() if "default" not in v
    )

    if "description" not in schema:
        if docstring.short_description:
            schema["description"] = docstring.short_description
        else:
            schema["description"] = (
                f"Correctly extracted `{cls.__name__}` with all "
                f"the required parameters with correct types"
            )

    return {
        "name": schema["title"],
        "description": schema["description"],
        "parameters": parameters,
    }