py-lightstreamer¶
http://github.com/dw/py-lightstreamer
This is a basic Python client library for Lightstreamer‘s HTTP text protocol implemented using threads.
Required Parameters¶
Before consuming a Lightstreamer service you must collect a few requisite settings. These can easily be found by observing an existing application’s HTTP requests, e.g. via Firebug or Wireshark.
- Adapter Set
- This is the name of the collection of data adapters for which a connection will instantiate tables. It is passed as a POST parameter
LS_adapter_set
tocreate_session.txt
orcreate_session.js
. - Data Adapter
- This is the name of the server-side driver responsible for producing table data. In some configurations it may not be specified, otherwise it appears as the
LS_data_adapter
orLS_adapter
(Lightstreamer < 4.1) POST parameter tocontrol.txt
orcontrol.js
. - Item Group
- This string is parsed by the data adapter and is usually a list of space or pipe-separated identifiers; it identifies individual keys to subscribe to, but in certain cases it may be a static string such as
ALL
. It is passed as theLS_id
POST parameter tocontrol.txt
orcontrol.js
whenLS_op=add
. - Schema
- This string is parsed by the data adapter and is usually a list of space or pipe-separated identifiers; it identifies the list of fields to subscribe to for each item in the item group. It is passed as the
LS_schema
POST parameter tocontrol.txt
orcontrol.js
whenLS_op=add
. - Table Mode
- This specifies the expected update mode for the target table, it is passed as the
LS_mode
POST parameter tocontrol.txt
orcontrol.js
whenLS_op=add
. Note that a common cause of no data received from the server is setting the wrong table mode. - Username and Password
- Your Lightstreamer server might not require a username and password, but if it does, these fields are visible as the
LS_user
andLS_password
POST parameters tocreate_session.txt
orcreate_session.js
. - Server URL
- This is the absolute URL to the Lightstreamer installation, usually ending with “
/lightstreamer
”. It can easily be observed as the prefix tocreate_session.txt
orcreate_session.js
HTTP calls.
Synopsis¶
The library exports LsClient
and Table
as its main classes. Both classes are expected to be consumed by event-driven code, where it’s natural to make use of callbacks for receiving data. Refer to the output of pydoc lightstreamer
for a full API reference.
Callbacks are always invoked from a single thread private to each LsClient
. For this reason any long running code for responding to an event should be deferred to another thread, otherwise you will block the LsClient
implementation.
Consumer code creates a session and subscribes to data by:
- Instantiating an
LsClient
:
client = lightstreamer.LsClient(MY_LIGHTSTREAMER_URL)
- Optionally subscribing to the
on_state
event:
def on_state(state):
print 'New state:', state
client.on_state.listen(on_state)
- Call
create_session()
to initialize the connection:
client.create_session(adapter_set='my_adapter_set',
username='my_username', password='my_password')
Session creation runs on a private thread, so create_session()
will return control to the caller immediately. For this reason you should subscribe to on_state
, where lightstreamer.STATE_CONNECTED
will be reported once creation succeeds.
- Instantiate one or more
Table
instances, optionally including aitem_factory
to deserialize incoming rows:
# Subscribe to bank balance. Supply a item_factory that converts the
# incoming list of strings to a tuple of floats.
table = lightstreamer.Table(client,
data_adapter='AccountInfoAdapter',
mode=lightstreamer.MODE_MERGE,
item_ids='account_1|account_2',
schema='total_credits|total_debits',
item_factory=lambda row: tuple(float(v) for v in row)
)
- Subscribe to the
on_update
event:
def on_bank_balance_changed(item_id, row):
print 'Total credits:', row[0]
print 'Total debits:', row[1]
table.on_update.listen(on_bank_balance_changed)
Note that due to how Lightstreamer works, initial rows may contain None
instead of a string. This is dependent partially on table mode (at least MODE_MERGE
and MODE_RAW
) and also whether snapshot=True
is specified, and supported by the server.
- Consume data as desired until it becomes uninteresting. To cancel a subscription to a single table, use
client.delete(table)
, or alternativelyclient.destroy()
followed byclient.join()
to shut down the entire client.
Warning: never invoke client.join()
from a Lightstreamer callback, as this will result in deadlock.
Connection States¶
The following module constants are passed as the parameter to on_state
.
lightstreamer.STATE_CONNECTING
- A session does not yet exist, we’re in the process of connecting for the first time. Any control messages will be buffered until after connection.
lightstreamer.STATE_CONNECTED
- Connected and forwarding messages. If no messages are available for forwarding,
on_heartbeat()
will fire at regular intervals to indicate the connection is still alive. lightstreamer.STATE_RECONNECTING
- A session exists, we’re just in the process of reconnecting because the server indicated it was time to do so. A healthy connection will alternate between
STATE_RECONNECTING
andSTATE_CONNECTED
states asLS_content_length
is exceeded. lightstreamer.STATE_DISCONNECTED
- Could not connect and will not retry because the server indicated a permanent error. After entering this state the thread stops, and session information is cleared. You must call
create_session()
to restart the session. This is the default state. Table subscription information is preserved such that a new call toclient.create_session()
will result in all existing subscriptions to be recreated in the new session.
Interface¶
Client Interface¶
-
class
lightstreamer.
LsClient
(base_url, work_queue=None, content_length=None, timeout_grace=None, polling_ms=None)¶ Manages a single Lightstreamer session. Callers are expected to:
- Create an instance and subscribe to on_state().
- Call create_session().
- Create lightstreamer.Table instances, or manually call allocate().
- Subscribe to each Table’s on_update().
- Call destroy() to shut down.
create_session() and send_control() calls are completed asynchronously on a private thread.
-
create_session
(username, adapter_set, password=None, max_bandwidth_kbps=None, content_length=None, keepalive_ms=None)¶ Begin authenticating with Lightstreamer and start the receive thread.
username is the Lightstreamer username (required). adapter_set is the adapter set name to use (required). password is the Lightstreamer password. max_bandwidth_kbps indicates the highest transmit rate of the
server in Kbps. Server’s default is used if unspecified.- content_length is the maximum size of the HTTP entity body before the
- server requests we reconnect; larger values reduce jitter. Server’s default is used if unspecified.
- keepalive_ms is the minimum time in milliseconds between PROBE
- messages when the server otherwise has nothing to say. Server’s default is used if unspecified.
-
delete
(table)¶ Instruct the server and LsClient to discard the given table.
-
destroy
()¶ Request the server destroy our session.
-
join
()¶ Wait for the receive thread to terminate.
-
on_heartbeat
¶ Subscribe func to heartbeats. The function is called with no arguments each time the connection receives any data.
-
on_state
¶ Subscribe func to connection state changes. Sole argument, state is one of the STATE_* constants.
-
start
(table)¶ If a table was created with silent=True, instruct the server to start delivering updates.
Table Interface¶
-
class
lightstreamer.
Table
(client, item_ids, mode=None, data_adapter=None, buffer_size=None, item_factory=None, max_frequency=None, schema=None, selector=None, silent=False, snapshot=False)¶ Lightstreamer table.
Abstracts management of a single table, and wraps incoming data in a item_factory to allow simple conversion to the user’s native data format.
Callers should subscribe a function at least to on_update() to receive row updates, and possibly also on_end_of_snapshot() to know when the first set of complete rows has been received.
The table is registed with the given LsClient during construction.
-
items
= None¶ This is a dict mapping item IDs to the last known value for the particular item. Note that if no updates have been received for a particular item, it will have no entry here.
-
on_end_of_snapshot
¶ Fired when the server indicates the first set of update messages representing a snapshot have been sent successfully.
-
on_update
¶ Fired when the client receives a new update message (i.e. data). Receives 2 arguments: item_id, and msg.
-
General Upset¶
The current implementation is threaded, which sucks. Unfortunately the only alternative solutions to asynchronous networking suck also, as they impose huge frameworks or runtime constraints on consumer code. So for the time being threads prevail.
Integration with Twisted can be achieved by simply wrapping all callbacks in twisted.internet.reactor.callFromThread()
:
def wrap(func):
return lambda *args: reactor.callFromThread(func, *args)
client.on_state.listen(wrap(self._on_state))
table.on_update.listen(wrap(self._on_update))
# etc.
A future version of the library might tidy this up a little.