We use cookies to deliver and improve our services, analyze site usage, and if you agree, to customize or personalize your experience and market our services to you. You can read our Cookie Policy here.
The Anthropic Python SDK provides convenient access to the Anthropic REST API from Python applications. It supports both synchronous and asynchronous operations, streaming, and integrations with AWS Bedrock and Google Vertex AI.
For API feature documentation with code examples, see the API reference. This page covers Python-specific SDK features and configuration.
pip install anthropicFor platform-specific integrations, install with extras:
# For AWS Bedrock support
pip install anthropic[bedrock]
# For Google Vertex AI support
pip install anthropic[vertex]
# For improved async performance with aiohttp
pip install anthropic[aiohttp]Python 3.9 or later is required.
import os
from anthropic import Anthropic
client = Anthropic(
# This is the default and can be omitted
api_key=os.environ.get("ANTHROPIC_API_KEY"),
)
message = client.messages.create(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Hello, Claude",
}
],
model="claude-opus-4-6",
)
print(message.content)Consider using python-dotenv to add ANTHROPIC_API_KEY="my-anthropic-api-key" to your .env file so that your API key isn't stored in source control.
import os
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
)
async def main() -> None:
message = await client.messages.create(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Hello, Claude",
}
],
model="claude-opus-4-6",
)
print(message.content)
asyncio.run(main())For improved async performance, you can use the aiohttp HTTP backend instead of the default httpx:
import os
import asyncio
from anthropic import AsyncAnthropic, DefaultAioHttpClient
async def main() -> None:
async with AsyncAnthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
http_client=DefaultAioHttpClient(),
) as client:
message = await client.messages.create(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Hello, Claude",
}
],
model="claude-opus-4-6",
)
print(message.content)
asyncio.run(main())The SDK provides support for streaming responses using Server-Sent Events (SSE).
stream = client.messages.create(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Hello, Claude",
}
],
model="claude-opus-4-6",
stream=True,
)
for event in stream:
print(event.type)The async client uses the exact same interface:
stream = await client.messages.create(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Hello, Claude",
}
],
model="claude-opus-4-6",
stream=True,
)
async for event in stream:
print(event.type)The SDK also provides streaming helpers that use context managers and provide access to the accumulated text and the final message:
async def main() -> None:
async with client.messages.stream(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Say hello there!",
}
],
model="claude-opus-4-6",
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()
message = await stream.get_final_message()
print(message.to_json())
asyncio.run(main())Streaming with client.messages.stream(...) exposes various helpers including accumulation and SDK-specific events.
Alternatively, you can use client.messages.create(..., stream=True) which only returns an async iterable of the events in the stream and uses less memory (it doesn't build up a final message object for you).
You can see the exact usage for a given request through the usage response property:
message = client.messages.create(...)
print(message.usage)
# Usage(input_tokens=25, output_tokens=13)You can also count tokens before making a request:
count = client.messages.count_tokens(
model="claude-opus-4-6", messages=[{"role": "user", "content": "Hello, world"}]
)
print(count.input_tokens) # 10This SDK provides support for tool use, also known as function calling. More details can be found in the tool use overview.
The SDK provides helpers for defining and running tools as pure Python functions. You can use the @beta_tool decorator for more control:
import json
from anthropic import Anthropic, beta_tool
client = Anthropic()
@beta_tool
def get_weather(location: str) -> str:
"""Get the weather for a given location.
Args:
location: The city and state, e.g. San Francisco, CA
Returns:
A dictionary containing the location, temperature, and weather condition.
"""
return json.dumps(
{
"location": location,
"temperature": "68°F",
"condition": "Sunny",
}
)
# Use the tool_runner to automatically handle tool calls
runner = client.beta.messages.tool_runner(
max_tokens=1024,
model="claude-opus-4-6",
tools=[get_weather],
messages=[
{"role": "user", "content": "What is the weather in SF?"},
],
)
for message in runner:
print(message)On every iteration, an API request is made. If Claude wants to call one of the given tools, it's automatically called, and the result is returned directly to the model in the next iteration.
This SDK provides support for the Message Batches API under client.messages.batches.
Message Batches takes an array of requests, where each object has a custom_id identifier and the same request params as the standard Messages API:
client.messages.batches.create(
requests=[
{
"custom_id": "my-first-request",
"params": {
"model": "claude-opus-4-6",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Hello, world"}],
},
},
{
"custom_id": "my-second-request",
"params": {
"model": "claude-opus-4-6",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Hi again, friend"}],
},
},
]
)Once a Message Batch has been processed, indicated by .processing_status == 'ended', you can access the results with .batches.results():
result_stream = client.messages.batches.results(batch_id)
for entry in result_stream:
if entry.result.type == "succeeded":
print(entry.result.message.content)Request parameters that correspond to file uploads can be passed in many different forms:
PathLike object (e.g., pathlib.Path)(filename, content, content_type)BinaryIO file-like objecttoFile helperfrom pathlib import Path
from anthropic import Anthropic
client = Anthropic()
# Upload using a file path
client.beta.files.upload(
file=Path("/path/to/file"),
betas=["files-api-2025-04-14"],
)
# Upload using bytes
client.beta.files.upload(
file=("file.txt", b"my bytes", "text/plain"),
betas=["files-api-2025-04-14"],
)The async client uses the exact same interface. If you pass a PathLike instance, the file contents are read asynchronously automatically.
When the library is unable to connect to the API, or if the API returns a non-success status code (i.e., 4xx or 5xx response), a subclass of APIError is raised:
import anthropic
try:
message = client.messages.create(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Hello, Claude",
}
],
model="claude-opus-4-6",
)
except anthropic.APIConnectionError as e:
print("The server could not be reached")
print(e.__cause__) # an underlying Exception, likely raised within httpx
except anthropic.RateLimitError as e:
print("A 429 status code was received; we should back off a bit.")
except anthropic.APIStatusError as e:
print("Another non-200-range status code was received")
print(e.status_code)
print(e.response)Error codes are as follows:
| Status Code | Error Type |
|---|---|
| 400 | BadRequestError |
| 401 | AuthenticationError |
| 403 | PermissionDeniedError |
| 404 | NotFoundError |
| 422 | UnprocessableEntityError |
| 429 | RateLimitError |
| >=500 | InternalServerError |
| N/A | APIConnectionError |
For more information on debugging requests, see the errors documentation.
All object responses in the SDK provide a _request_id property which is added from the request-id response header so that you can quickly log failing requests and report them back to Anthropic.
message = client.messages.create(
max_tokens=1024,
messages=[{"role": "user", "content": "Hello, Claude"}],
model="claude-opus-4-6",
)
print(message._request_id) # e.g., req_018EeWyXxfu5pfWkrYcMdjWGUnlike other properties that use an _ prefix, the _request_id property is public. Unless documented otherwise, all other _ prefix properties, methods, and modules are private.
Certain errors are automatically retried 2 times by default, with a short exponential backoff. Connection errors (for example, due to a network connectivity problem), 408 Request Timeout, 409 Conflict, 429 Rate Limit, and >=500 Internal errors are all retried by default.
You can use the max_retries option to configure or disable this:
# Configure the default for all requests:
client = Anthropic(
max_retries=0, # default is 2
)
# Or, configure per-request:
client.with_options(max_retries=5).messages.create(
max_tokens=1024,
messages=[{"role": "user", "content": "Hello, Claude"}],
model="claude-opus-4-6",
)By default requests time out after 10 minutes. You can configure this with a timeout option, which accepts a float or an httpx.Timeout object:
import httpx
from anthropic import Anthropic
# Configure the default for all requests:
client = Anthropic(
timeout=20.0, # 20 seconds (default is 10 minutes)
)
# More granular control:
client = Anthropic(
timeout=httpx.Timeout(60.0, read=5.0, write=10.0, connect=2.0),
)
# Override per-request:
client.with_options(timeout=5.0).messages.create(
max_tokens=1024,
messages=[{"role": "user", "content": "Hello, Claude"}],
model="claude-opus-4-6",
)On timeout, an APITimeoutError is thrown.
Note that requests which time out will be retried twice by default.
Consider using the streaming Messages API for longer running requests.
Avoid setting a large max_tokens value without using streaming. Some networks may drop idle connections after a certain period of time, which can cause the request to fail or timeout without receiving a response from Anthropic.
The SDK will throw a ValueError if a non-streaming request is expected to take longer than approximately 10 minutes. Passing stream=True or overriding the timeout option at the client or request level disables this error.
An expected request latency longer than the timeout for a non-streaming request will result in the client terminating the connection and retrying without receiving a response.
The SDK sets a TCP socket keep-alive option to reduce the impact of idle connection timeouts on some networks. This can be overridden by passing a custom http_client option to the client.
List methods in the Claude API are paginated. You can use the for syntax to iterate through items across all pages:
all_batches = []
# Automatically fetches more pages as needed.
for batch in client.messages.batches.list(limit=20):
all_batches.append(batch)
print(all_batches)For async iteration:
async def main() -> None:
all_batches = []
async for batch in client.messages.batches.list(limit=20):
all_batches.append(batch)
print(all_batches)
asyncio.run(main())Alternatively, you can use the .has_next_page(), .next_page_info(), or .get_next_page() methods for more granular control working with pages:
first_page = await client.messages.batches.list(limit=20)
if first_page.has_next_page():
print(f"will fetch next page using these details: {first_page.next_page_info()}")
next_page = await first_page.get_next_page()
print(f"number of items we just fetched: {len(next_page.data)}")
# Remove `await` for non-async usage.Or work directly with the returned data:
first_page = await client.messages.batches.list(limit=20)
print(f"next page cursor: {first_page.last_id}")
for batch in first_page.data:
print(batch.id)
# Remove `await` for non-async usage.The SDK automatically sends the anthropic-version header set to 2023-06-01.
If you need to, you can override it by setting default headers on the client object or per-request.
Overriding default headers may result in incorrect types and other unexpected or undefined behavior in the SDK.
# Set default headers for all requests on the client
client = Anthropic(
default_headers={"anthropic-version": "My-Custom-Value"},
)
# Or override per-request
client.messages.with_raw_response.create(
max_tokens=1024,
messages=[{"role": "user", "content": "Hello, Claude"}],
model="claude-opus-4-6",
extra_headers={"anthropic-version": "My-Custom-Value"},
)Nested request parameters are TypedDicts. Responses are Pydantic models which also have helper methods for things like serializing back into JSON (v1, v2).
Typed requests and responses provide autocomplete and documentation within your editor. If you'd like to see type errors in VS Code to help catch bugs earlier, set python.analysis.typeCheckingMode to basic.
To convert a Pydantic model to a dictionary, use the helper methods:
message = client.messages.create(...)
# Convert to JSON string
json_str = message.to_json()
# Convert to dictionary
data = message.to_dict()In responses, you can distinguish between fields that are explicitly null versus fields that were not returned (missing):
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
)
if response.my_field is None:
if "my_field" not in response.model_fields_set:
print("field was not in the response")
else:
print("field was null")The "raw" Response returned by httpx can be accessed via the .with_raw_response property on the client. This is useful for accessing response headers or other metadata:
response = client.messages.with_raw_response.create(
max_tokens=1024,
messages=[{"role": "user", "content": "Hello, Claude"}],
model="claude-opus-4-6",
)
print(response.headers.get("x-request-id"))
message = (
response.parse()
) # get the object that `messages.create()` would have returned
print(message.content)These methods return an APIResponse object.
The .with_raw_response approach above eagerly reads the full response body when you make the request. To stream the response body instead, use .with_streaming_response, which requires a context manager and only reads the response body once you call .read(), .text(), .json(), .iter_bytes(), .iter_text(), .iter_lines(), or .parse(). In the async client, these are async methods.
with client.messages.with_streaming_response.create(
max_tokens=1024,
messages=[{"role": "user", "content": "Hello, Claude"}],
model="claude-opus-4-6",
) as response:
print(response.headers.get("x-request-id"))
for line in response.iter_lines():
print(line)The context manager is required so that the response will reliably be closed.
The SDK uses the standard library logging module.
You can enable logging by setting the environment variable ANTHROPIC_LOG to one of debug, info, warn, or off:
export ANTHROPIC_LOG=debugThis library is typed for convenient access to the documented API. If you need to access undocumented endpoints, params, or response properties, the library can still be used.
To make requests to undocumented endpoints, you can use client.get, client.post, and other HTTP verbs. Options on the client, such as retries, will be respected when making these requests.
import httpx
response = client.post(
"/foo",
cast_to=httpx.Response,
body={"my_param": True},
)
print(response.json())If you want to explicitly send an extra param, you can do so with the extra_query, extra_body, and extra_headers request options.
The extra_ parameters override documented parameters of the same name. For security reasons, ensure these methods are only used with trusted input data.
To access undocumented response properties, you can access the extra fields like response.unknown_prop. You can also get all extra fields on the Pydantic model as a dict with response.model_extra.
You can directly override the httpx client to customize it for your use case, including support for proxies and transports:
import httpx
from anthropic import Anthropic, DefaultHttpxClient
client = Anthropic(
# Or use the `ANTHROPIC_BASE_URL` env var
base_url="http://my.test.server.example.com:8083",
http_client=DefaultHttpxClient(
proxy="http://my.test.proxy.example.com",
transport=httpx.HTTPTransport(local_address="0.0.0.0"),
),
)You can also customize the client on a per-request basis by using with_options():
client.with_options(http_client=DefaultHttpxClient(...))Use DefaultHttpxClient and DefaultAsyncHttpxClient instead of raw httpx.Client and httpx.AsyncClient to ensure the SDK's default configuration (timeouts, connection limits, etc.) is preserved.
By default the library closes underlying HTTP connections whenever the client is garbage collected. You can manually close the client using the .close() method if desired, or with a context manager that closes when exiting.
from anthropic import Anthropic
with Anthropic() as client:
message = client.messages.create(...)
# HTTP client is automatically closedBeta features are available before general release to get early feedback and test new functionality. You can check the availability of all of Claude's capabilities and tools in the build with Claude overview.
You can access most beta API features through the beta property of the client. To enable a particular beta feature, you need to add the appropriate beta header to the betas field when creating a message.
For example, to use the Files API:
response = client.beta.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "Please summarize this document for me."},
{
"type": "document",
"source": {
"type": "file",
"file_id": "file_abc123",
},
},
],
},
],
betas=["files-api-2025-04-14"],
)For detailed platform setup guides with code examples, see:
All three client classes are included in the base anthropic package:
| Provider | Client | Extra dependencies |
|---|---|---|
| Bedrock | from anthropic import AnthropicBedrock | pip install anthropic[bedrock] |
| Vertex AI | from anthropic import AnthropicVertex | pip install anthropic[vertex] |
| Foundry | from anthropic import AnthropicFoundry | None |
This package generally follows SemVer conventions, though certain backwards-incompatible changes may be released as minor versions:
If you've upgraded to the latest version but aren't seeing new features you were expecting, your Python environment is likely still using an older version. You can determine the version being used at runtime with:
import anthropic
print(anthropic.__version__)Was this page helpful?