Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
228f6ff
Add stream RPC client
alexex10 Jun 24, 2026
ce0d3f6
Add stream RPC client
alexex10 Jun 26, 2026
ebd76db
Merge branch 'starknet' of ex10.github.com:x10xchange/python_sdk into…
alexex10 Jun 26, 2026
17e61ae
Add stream RPC client
alexex10 Jun 26, 2026
dc238e5
Add stream RPC client
alexex10 Jun 26, 2026
ca9361d
Add stream RPC client
alexex10 Jun 26, 2026
f0b6c8b
Add stream RPC client
alexex10 Jun 26, 2026
1aef5a4
Add stream RPC client
alexex10 Jun 26, 2026
3f2f456
Add stream RPC client
alexex10 Jun 26, 2026
3b9ec8c
Add stream RPC client
alexex10 Jun 26, 2026
a194bed
Add stream RPC client
alexex10 Jun 26, 2026
0a06d95
Add stream RPC client
alexex10 Jun 26, 2026
9e2d539
Add stream RPC client
alexex10 Jun 26, 2026
d120ff3
Add stream RPC client
alexex10 Jun 29, 2026
057a82b
Add stream RPC client
alexex10 Jun 29, 2026
71fd53c
Add stream RPC client
alexex10 Jun 29, 2026
7b2b3f0
Add stream RPC client
alexex10 Jun 29, 2026
45a5689
Add stream RPC client
alexex10 Jun 29, 2026
aa4ddd6
Add stream RPC client
alexex10 Jun 29, 2026
d084087
Add stream RPC client
alexex10 Jun 29, 2026
ac4b788
Add stream RPC client
alexex10 Jun 29, 2026
b7bd847
Add stream RPC client
alexex10 Jun 29, 2026
e82d12d
Add stream RPC client
alexex10 Jun 29, 2026
8611301
Add stream RPC client
alexex10 Jun 29, 2026
c79ae1c
Add stream RPC client
alexex10 Jun 29, 2026
a4047b0
Add stream RPC client
alexex10 Jun 29, 2026
ab7c0ef
Add stream RPC client
alexex10 Jun 29, 2026
d18a5f8
Add stream RPC client
alexex10 Jun 29, 2026
cdd1084
Add stream RPC client
alexex10 Jun 29, 2026
205ee3a
Add stream RPC client
alexex10 Jun 29, 2026
e8dbd9b
Add stream RPC client
alexex10 Jun 29, 2026
5f2d901
Add stream RPC client
alexex10 Jun 29, 2026
47b9019
Add stream RPC client
alexex10 Jun 29, 2026
09ca323
Add stream RPC client
alexex10 Jun 29, 2026
fb72667
Add stream RPC client
alexex10 Jun 30, 2026
787aa8d
Add stream RPC client
alexex10 Jun 30, 2026
4078e53
Add stream RPC client
alexex10 Jun 30, 2026
91afaee
Add stream RPC client
alexex10 Jun 30, 2026
d992741
Add stream RPC client
alexex10 Jun 30, 2026
c8bda94
Add stream RPC client
alexex10 Jun 30, 2026
9b63f31
Add stream RPC client
alexex10 Jun 30, 2026
b17c01d
Add stream RPC client
alexex10 Jun 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions examples/cases/stream/subscribe_to_rpc_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import asyncio
import logging
from asyncio import run
from signal import SIGINT, SIGTERM

from examples.utils import BTC_USD_MARKET, create_stream_rpc_client, init_env
from x10.clients.streamrpc.subscription_params import (
CandlesParams,
PricesParams,
TradesParams,
)
from x10.config import get_config_by_name
from x10.models.stream_rpc import StreamRpcResponseModel

LOGGER = logging.getLogger()
MARKET_NAME = BTC_USD_MARKET


def on_message(message: StreamRpcResponseModel) -> None:
LOGGER.info("Received message: %s", message)


async def subscribe_to_rpc_stream(stop_event: asyncio.Event):
env_config = init_env()
client_config = get_config_by_name(env_config.client_config_name)

async with create_stream_rpc_client(client_config) as client:
await client.ping()

subscriptions_before = await client.list_subscriptions()

LOGGER.info("Active subscriptions: %s", subscriptions_before)

await client.subscribe(params=TradesParams(market="BTC-USD"), handler=on_message)
await client.subscribe(params=TradesParams(market="ETH-USD"), handler=on_message)
await client.subscribe(params=PricesParams(price_type="index", market="ETH-USD"), handler=on_message)
await client.subscribe(
params=CandlesParams(candle_type="index", market="ETH-USD", interval="PT1M"), handler=on_message
)

subscriptions_after = await client.list_subscriptions()

LOGGER.info("Active subscriptions: %s", subscriptions_after)

await stop_event.wait()


async def run_example():
stop_event = asyncio.Event()
loop = asyncio.get_running_loop()

def signal_handler():
LOGGER.info("Signal received, stopping...")
stop_event.set()

loop.add_signal_handler(SIGINT, signal_handler)
loop.add_signal_handler(SIGTERM, signal_handler)

await subscribe_to_rpc_stream(stop_event)


if __name__ == "__main__":
run(main=run_example())
5 changes: 5 additions & 0 deletions examples/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from x10.clients.blocking import BlockingTradingClient
from x10.clients.rest import RestApiClient
from x10.clients.stream import StreamClient
from x10.clients.streamrpc.streamrpc_client import StreamRpcClient
from x10.config import get_config_by_name
from x10.core.client_config import ClientConfig
from x10.core.env_config import EnvConfig
Expand Down Expand Up @@ -71,6 +72,10 @@ def create_stream_client(config: ClientConfig):
return StreamClient(api_url=config.endpoints.stream_url)


def create_stream_rpc_client(config: ClientConfig):
return StreamRpcClient(api_url=config.endpoints.stream_rpc_url)


def get_adjust_price_by_pct(config: TradingConfigModel):
def adjust_price_by_pct(price: Decimal, pct: Decimal | int):
return config.round_price(price + price * Decimal(pct) / 100)
Expand Down
4 changes: 2 additions & 2 deletions tests/clients/test_rest_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ async def test_get_markets(aiohttp_server, create_btc_usd_market):
"collateralAssetName": "USD",
"collateralAssetPrecision": 6,
"active": True,
"isRfq": True,
"isOffHours": True,
"isRfq": False,
"isOffHours": False,
"marketStats": {
"dailyVolume": "2410800.768021",
"dailyVolumeBase": "37.94502",
Expand Down
84 changes: 84 additions & 0 deletions tests/clients/test_streamrpc_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import asyncio
import json

import pytest
import websockets
from hamcrest import assert_that, equal_to
from websockets import WebSocketServer


def get_url_from_server(server: WebSocketServer):
host, port = server.sockets[0].getsockname() # type: ignore[index]
return f"ws://{host}:{port}"


@pytest.mark.asyncio
async def test_candle_stream():
from tests.fixtures.candle import create_candle_stream_rpc_message
from x10.clients.streamrpc.streamrpc_client import StreamRpcClient
from x10.clients.streamrpc.subscription_params import CandlesParams

message_model = create_candle_stream_rpc_message()
received_messages: asyncio.Queue = asyncio.Queue()

async def subscription_handler(msg):
await received_messages.put(msg)

async def mock_server(websocket):
subscribe_msg_raw = await websocket.recv()
subscribe_msg = json.loads(subscribe_msg_raw)

assert_that(subscribe_msg["method"], equal_to("subscribe"))

await websocket.send(
json.dumps(
{
"id": subscribe_msg["id"],
"result": {"subscription": message_model.subscription},
}
)
)

await websocket.send(json.dumps(message_model.to_api_request_json()))

unsubscribe_msg_raw = await websocket.recv()
unsubscribe_msg = json.loads(unsubscribe_msg_raw)

assert_that(unsubscribe_msg["method"], equal_to("unsubscribe"))

await websocket.send(
json.dumps(
{
"id": unsubscribe_msg["id"],
"result": {"method": "unsubscribe", "status": "OK"},
}
)
)

async with websockets.serve(mock_server, "127.0.0.1", 0) as server:
client = StreamRpcClient(api_url=get_url_from_server(server))
await client.connect()

subscription_params = CandlesParams(candle_type="last", market="BTC-USD", interval="PT1M")
subscription_id = await client.subscribe(params=subscription_params, handler=subscription_handler)

msg = await asyncio.wait_for(received_messages.get(), timeout=5)

await client.unsubscribe(subscription_id)
await client.close()

assert_that(
msg.to_api_request_json(),
equal_to(
{
"type": "CANDLES",
"data": [
{"o": "3458.64", "l": "3399.07", "h": "3476.89", "c": "3414.85", "v": "3.938", "T": 1721106000000}
],
"error": None,
"ts": 1721283121979,
"seq": 1,
"subscription": "candles.last.BTC-USD.PT1M",
}
),
)
20 changes: 20 additions & 0 deletions tests/fixtures/candle.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from x10.models.candle import CandleModel
from x10.models.http import WrappedStreamResponseModel
from x10.models.stream_rpc import StreamRpcResponseModel


def create_candle_stream_message():
Expand All @@ -20,3 +21,22 @@ def create_candle_stream_message():
ts=1721283121979,
seq=1,
)


def create_candle_stream_rpc_message():
return StreamRpcResponseModel(
type="CANDLES",
data=[
CandleModel(
open=Decimal("3458.64"),
low=Decimal("3399.07"),
high=Decimal("3476.89"),
close=Decimal("3414.85"),
volume=Decimal("3.938"),
timestamp=1721106000000,
)
],
ts=1721283121979,
seq=1,
subscription="candles.last.BTC-USD.PT1M",
)
4 changes: 2 additions & 2 deletions tests/fixtures/market.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ def get_btc_usd_market_json_data():
"collateralAssetName": "USD",
"collateralAssetPrecision": 6,
"active": true,
"isRfq": true,
"isOffHours": true,
"isRfq": false,
"isOffHours": false,
"marketStats": {
"dailyVolume": "2410800.768021",
"dailyVolumeBase": "37.94502",
Expand Down
Empty file.
Loading
Loading