Difference between revisions of "Query"
(3 intermediate revisions by the same user not shown) | |||
Line 33: | Line 33: | ||
==Example== | ==Example== | ||
− | An example of a query for historical data might be to retrieve the first 1000 time and sales for MSFT.NSDQ from | + | An example of a query for historical data might be to retrieve the first 1000 time and sales for MSFT.NSDQ from December 11th, 2018 to December 12th, 2018. |
===C++=== | ===C++=== | ||
Line 42: | Line 42: | ||
int main() { | int main() { | ||
− | Beam::Network::SocketThreadPool | + | auto socketThreadPool = Beam::Network::SocketThreadPool(); |
− | Beam::Threading::TimerThreadPool | + | auto timerThreadPool = Beam::Threading::TimerThreadPool(); |
// Connect to the service. | // Connect to the service. | ||
− | Nexus::ApplicationServiceClients | + | auto serviceClients = Nexus::ApplicationServiceClients( |
− | Beam::Network::IpAddress | + | Beam::Network::IpAddress("127.0.0.1", 20000), "username", |
− | "password", Beam::Ref(socketThreadPool), Beam::Ref(timerThreadPool) | + | "password", Beam::Ref(socketThreadPool), Beam::Ref(timerThreadPool)); |
serviceClients.Open(); | serviceClients.Open(); | ||
auto& marketDataClient = serviceClients.GetMarketDataClient(); | auto& marketDataClient = serviceClients.GetMarketDataClient(); | ||
Line 56: | Line 56: | ||
query.SetIndex(Nexus::ParseSecurity("MSFT.NSDQ")); | query.SetIndex(Nexus::ParseSecurity("MSFT.NSDQ")); | ||
query.SetRange(Beam::TimeService::ToUtcTime( | query.SetRange(Beam::TimeService::ToUtcTime( | ||
− | boost::posix_time::ptime | + | boost::posix_time::ptime(boost::gregorian::date(2018, 12, 11))), |
Beam::TimeService::ToUtcTime( | Beam::TimeService::ToUtcTime( | ||
− | boost::posix_time::ptime | + | boost::posix_time::ptime(boost::gregorian::date(2018, 12, 12)))); |
query.SetSnapshotLimit(Beam::Queries::SnapshotLimit::Type::HEAD, 1000); | query.SetSnapshotLimit(Beam::Queries::SnapshotLimit::Type::HEAD, 1000); | ||
Line 92: | Line 92: | ||
# Build the query. | # Build the query. | ||
− | query = | + | query = beam.queries.Query() |
query.index = nexus.parse_security('MSFT.NSDQ') | query.index = nexus.parse_security('MSFT.NSDQ') | ||
query.range = beam.queries.Range(beam.time_service.to_utc_time( | query.range = beam.queries.Range(beam.time_service.to_utc_time( | ||
− | datetime.datetime( | + | datetime.datetime(2018, 12, 11)), beam.time_service.to_utc_time( |
− | datetime.datetime( | + | datetime.datetime(2018, 12, 12))) |
query.snapshot_limit = beam.queries.SnapshotLimit( | query.snapshot_limit = beam.queries.SnapshotLimit( | ||
beam.queries.SnapshotLimit.Type.HEAD, 1000) | beam.queries.SnapshotLimit.Type.HEAD, 1000) | ||
Line 109: | Line 109: | ||
try: | try: | ||
while True: | while True: | ||
− | print queue.top().price | + | print(queue.top().price) |
queue.pop() | queue.pop() | ||
except: | except: | ||
pass | pass | ||
</syntaxhighlight> | </syntaxhighlight> |
Latest revision as of 13:01, 19 December 2018
Beam provides a series of classes to query historical and real time streaming data along a wide range of search criteria. It is one of the core APIs provided by Beam and used extensively in Spire to search through and disseminate market data and order executions.
Contents
Structure
Much of the data stored by Spire is in the form of a time series. The data is grouped together based on an index, for example time and sales data is indexed by its ticker symbol, order's are indexed by the account the order belongs to etc... For every item there is both a timestamp and a sequence number associated with it (to identify which data was published first in the event that two items have the same timestamp).
The most common type of query used in Spire to search through a time series is defined by the Beam::Queries::BasicQuery class. It consists of five components that are used to fully specify the type of data that is to be retrieved. Those five components are described in detail as follows:
Index
Every data item to query over must be associated with a single index. Specifying the value of the index to query over is defined by the class Beam::Queries::IndexedQuery.
Range
This component is defined by the class Beam::Queries::RangedIndex and it specifies the time range to search through as well as whether the query is strictly for historical data, real time data, or both. The class Beam::Queries::Range defines the semantics of a range and consists of a start value and an end value, both of which can be either a timestamp or a sequence number. Sequence numbers are defined by the class Beam::Queries::Sequence and it contains two special values which control the behavior of a query: LAST and PRESENT.
If the end of a range is specified to be PRESENT then the query will return all matching values that are currently stored in the database. If the end of a range is specified to be LAST or +infinity then in addition to returning all currently matching values, the query will also return matching values published in real time as they arrive.
Any other value used to mark the end of a range results in a strictly historical data query, even if the value denotes some point in the future.
Snapshot limit
This specifies the maximusm number of historical data items to retrieve. This is typically used to prevent a query from returning too much data. The class Beam::Queries::SnapshotLimit contains two values, the size of the limit and the type of the limit. The size controls the maximum number of historical data items are returned by a query (with the special value of UNLIMITED if every value should be returned), as well as the type of snapshot. The type dictates whether items should be returned from the beginning of the series or the end of the series (the HEAD and TAIL respectively).
For example to query for the very last item currently stored in a time series, one can use a range of [FIRST, PRESENT] and a snapshot limit whose type is TAIL and whose size is 1. Alternatively to query for the first 10 values in a time series one would use a range of [FIRST, PRESENT] and a snapshot limit with type HEAD and size 10.
The class Beam::Queries::SnapshotLimitedQuery is the component used to indicate what the snapshot limit of a query should be. By default, this class specifies that no values are to be returned (its size is 0). This is because one is expected to be explicit about how much data is desired. The special value Beam::Queries::SnapshotLimit::Unlimited() can be used to explicitly indicate that there is no limit on how much data should be returned, however, it should be noted that the database engine handling the request may impose limits of its own regarding the maximum size of a snapshot.
Interruption policy
When querying for real time data it is possible for the query to get interrupted, usually due to a disconnection. This component, defined by Beam::Queries::InterruptableQuery, specifies what action should be taken in the event of such an interruption. The possible recovery options are defined by the enumerator Beam::Queries::InterruptionPolicy as follows:
RECOVER_DATA, the query should attempt to re-establish a connection to the database and recover any data that was published during the time that the query was interrupted. IGNORE_CONTINUE, the query should attempt to re-establish a connection to the database but ignore any data that was published during the time that the query was interrupted. This is usually desirable when only live, real time data is desired as this option will omit any stale data published during the interruption. BREAK_QUERY, the query should abort, indicating an exception. This is the default interruption policy.
Filter
Example
An example of a query for historical data might be to retrieve the first 1000 time and sales for MSFT.NSDQ from December 11th, 2018 to December 12th, 2018.
C++
1 #include <iostream>
2 #include "Nexus/MarketDataService/SecurityMarketDataQuery.hpp"
3 #include "Nexus/ServiceClients/ApplicationServiceClients.hpp"
4
5 int main() {
6 auto socketThreadPool = Beam::Network::SocketThreadPool();
7 auto timerThreadPool = Beam::Threading::TimerThreadPool();
8
9 // Connect to the service.
10 auto serviceClients = Nexus::ApplicationServiceClients(
11 Beam::Network::IpAddress("127.0.0.1", 20000), "username",
12 "password", Beam::Ref(socketThreadPool), Beam::Ref(timerThreadPool));
13 serviceClients.Open();
14 auto& marketDataClient = serviceClients.GetMarketDataClient();
15
16 // Build the query.
17 auto query = Nexus::MarketDataService::SecurityMarketDataQuery();
18 query.SetIndex(Nexus::ParseSecurity("MSFT.NSDQ"));
19 query.SetRange(Beam::TimeService::ToUtcTime(
20 boost::posix_time::ptime(boost::gregorian::date(2018, 12, 11))),
21 Beam::TimeService::ToUtcTime(
22 boost::posix_time::ptime(boost::gregorian::date(2018, 12, 12))));
23 query.SetSnapshotLimit(Beam::Queries::SnapshotLimit::Type::HEAD, 1000);
24
25 // Build the Queue to store the results.
26 auto queue = std::make_shared<Beam::Queue<Nexus::TimeAndSale>>();
27
28 // Submit the query to the market data service.
29 marketDataClient.QueryTimeAndSales(query, queue);
30
31 // Print the results
32 try {
33 while(true) {
34 std::cout << queue->Top().m_price << std::endl;
35 queue->Pop();
36 }
37 } catch(const Beam::PipeBrokenException&) {}
38 }
Python
1 import beam
2 import nexus
3 import datetime
4
5 # Connect to the service.
6 service_clients = nexus.ApplicationServiceClients(
7 beam.network.IpAddress('127.0.0.1', 20000),
8 'username', 'password')
9
10 service_clients.open()
11 market_data_client = service_clients.get_market_data_client()
12
13 # Build the query.
14 query = beam.queries.Query()
15 query.index = nexus.parse_security('MSFT.NSDQ')
16 query.range = beam.queries.Range(beam.time_service.to_utc_time(
17 datetime.datetime(2018, 12, 11)), beam.time_service.to_utc_time(
18 datetime.datetime(2018, 12, 12)))
19 query.snapshot_limit = beam.queries.SnapshotLimit(
20 beam.queries.SnapshotLimit.Type.HEAD, 1000)
21
22 # Build the Queue to store the results.
23 queue = beam.Queue()
24
25 # Submit the query to the market data service.
26 market_data_client.query_time_and_sales(query, queue)
27
28 # Print the results
29 try:
30 while True:
31 print(queue.top().price)
32 queue.pop()
33 except:
34 pass